NOISSUE - Switch to / delimiter (#3424)

Signed-off-by: dusan <borovcanindusan1@gmail.com>
This commit is contained in:
Dušan Borovčanin
2026-04-03 18:37:00 +02:00
committed by GitHub
parent 6a3319828e
commit 791e084de6
46 changed files with 603 additions and 407 deletions
+1 -1
View File
@@ -17,7 +17,7 @@ import (
)
const (
AllTopic = "alarms.>"
AllTopic = "alarms/#"
prefix = "alarms"
)
+2 -2
View File
@@ -155,7 +155,7 @@ components:
description: An id of the owner who created subscription.
topic:
type: string
example: topic.subtopic
example: topic/subtopic
description: Topic to which the user subscribes.
contact:
type: string
@@ -166,7 +166,7 @@ components:
properties:
topic:
type: string
example: topic.subtopic
example: topic/subtopic
description: Topic to which the user subscribes.
contact:
type: string
+1 -1
View File
@@ -269,7 +269,7 @@ supermq-cli channels connections <channel_id> <user_token>
#### Send a message over HTTP
```bash
supermq-cli messages send <domain_id> <channel_id.subtopic> <secret> '[{"bn":"Dev1","n":"temp","v":20}, {"n":"hum","v":40}, {"bn":"Dev2", "n":"temp","v":20}, {"n":"hum","v":40}]'
supermq-cli messages send <domain_id> <channel_id/subtopic> <secret> '[{"bn":"Dev1","n":"temp","v":20}, {"n":"hum","v":40}, {"bn":"Dev2", "n":"temp","v":20}, {"n":"hum","v":40}]'
```
### Groups
+1 -1
View File
@@ -7,7 +7,7 @@ import "github.com/spf13/cobra"
var cmdMessages = []cobra.Command{
{
Use: "send <domain_id> <channel_id.subtopic> <JSON_string> <secret>",
Use: "send <domain_id> <channel_id/subtopic> <JSON_string> <secret>",
Short: "Send messages",
Long: `Sends message on the channel`,
Run: func(cmd *cobra.Command, args []string) {
+3 -3
View File
@@ -35,9 +35,9 @@ func TestSendMesageCmd(t *testing.T) {
{
desc: "send message successfully",
args: []string{
domainID,
channel.ID,
message,
domainID,
client.Credentials.Secret,
},
logType: okLog,
@@ -45,10 +45,10 @@ func TestSendMesageCmd(t *testing.T) {
{
desc: "send message with invalid args",
args: []string{
domainID,
channel.ID,
message,
client.Credentials.Secret,
domainID,
extraArg,
},
logType: usageLog,
@@ -56,9 +56,9 @@ func TestSendMesageCmd(t *testing.T) {
{
desc: "send message with invalid client secret",
args: []string{
domainID,
channel.ID,
message,
domainID,
"invalid_secret",
},
sdkErr: errors.NewSDKErrorWithStatus(errors.Wrap(svcerr.ErrAuthentication, errors.Wrap(svcerr.ErrAuthorization, svcerr.ErrNotFound)), http.StatusBadRequest),
+3
View File
@@ -208,6 +208,9 @@ func (svc service) UpdateSecret(ctx context.Context, session authn.Session, id,
if err != nil {
return Client{}, errors.Wrap(svcerr.ErrUpdateEntity, err)
}
if err := svc.cache.Remove(ctx, client.ID); err != nil {
return client, errors.Wrap(svcerr.ErrRemoveEntity, err)
}
return client, nil
}
+23
View File
@@ -707,6 +707,7 @@ func TestUpdateSecret(t *testing.T) {
updateSecretResponse clients.Client
session smqauthn.Session
updateErr error
removeErr error
err error
}{
{
@@ -732,15 +733,37 @@ func TestUpdateSecret(t *testing.T) {
updateErr: repoerr.ErrMalformedEntity,
err: svcerr.ErrUpdateEntity,
},
{
desc: "update client secret with failed to remove cache",
client: client,
newSecret: "newSecret",
session: smqauthn.Session{UserID: validID},
updateSecretResponse: clients.Client{
ID: client.ID,
Credentials: clients.Credentials{
Identity: client.Credentials.Identity,
Secret: "newSecret",
},
},
removeErr: repoerr.ErrRemoveEntity,
err: svcerr.ErrRemoveEntity,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
repoCall := repo.On("UpdateSecret", context.Background(), mock.Anything).Return(tc.updateSecretResponse, tc.updateErr)
var cacheCall *mock.Call
if tc.updateErr == nil {
cacheCall = cache.On("Remove", context.Background(), tc.updateSecretResponse.ID).Return(tc.removeErr)
}
updatedClient, err := svc.UpdateSecret(context.Background(), tc.session, tc.client.ID, tc.newSecret)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
assert.Equal(t, tc.updateSecretResponse, updatedClient, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.updateSecretResponse, updatedClient))
repoCall.Unset()
if cacheCall != nil {
cacheCall.Unset()
}
})
}
}
+1 -1
View File
@@ -92,7 +92,7 @@ type config struct {
StandaloneID string `env:"MG_CLIENTS_STANDALONE_ID" envDefault:""`
StandaloneToken string `env:"MG_CLIENTS_STANDALONE_TOKEN" envDefault:""`
CacheURL string `env:"MG_CLIENTS_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"MG_CLIENTS_CACHE_KEY_DURATION" envDefault:"10m"`
CacheKeyDuration time.Duration `env:"MG_CLIENTS_CACHE_KEY_DURATION" envDefault:"1h"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"MG_ES_URL" envDefault:"amqp://guest:guest@localhost:5682/"`
+1 -1
View File
@@ -65,7 +65,7 @@ curl -X POST http://localhost:9014/subscriptions \
-H "Authorization: Bearer <user_access_token>" \
-H "Content-Type: application/json" \
-d '{
"topic": "some.topic.subtopic",
"topic": "some/topic/subtopic",
"contact": "user@example.com"
}'
```
+60 -60
View File
@@ -10,15 +10,15 @@ The service is configured using environment variables. Values shown are from [do
Used by `consumers/notifiers/smtp` via `internal/email`.
| Variable | Description | Default |
| --- | --- | --- |
| `MG_EMAIL_HOST` | SMTP host | `smtp.mailtrap.io` |
| `MG_EMAIL_PORT` | SMTP port | `2525` |
| `MG_EMAIL_USERNAME` | SMTP username | `18bf7f70705139` |
| `MG_EMAIL_PASSWORD` | SMTP password | `2b0d302e775b1e` |
| Variable | Description | Default |
| ----------------------- | ---------------------------------------------- | ------------------ |
| `MG_EMAIL_HOST` | SMTP host | `smtp.mailtrap.io` |
| `MG_EMAIL_PORT` | SMTP port | `2525` |
| `MG_EMAIL_USERNAME` | SMTP username | `18bf7f70705139` |
| `MG_EMAIL_PASSWORD` | SMTP password | `2b0d302e775b1e` |
| `MG_EMAIL_FROM_ADDRESS` | Default from address (used if `from` is empty) | `from@example.com` |
| `MG_EMAIL_FROM_NAME` | Default from name | `Example` |
| `MG_EMAIL_TEMPLATE` | Email template path | `email.tmpl` |
| `MG_EMAIL_FROM_NAME` | Default from name | `Example` |
| `MG_EMAIL_TEMPLATE` | Email template path | `email.tmpl` |
### SMPP notifier (SMS)
@@ -26,16 +26,16 @@ Used by `consumers/notifiers/smtp` via `internal/email`.
Defined in `consumers/notifiers/smpp/config.go`.
| Variable | Description | Default |
| --- | --- | --- |
| `MG_SMPP_ADDRESS` | SMPP address in `host:port` format | "" |
| `MG_SMPP_USERNAME` | SMPP username | "" |
| `MG_SMPP_PASSWORD` | SMPP password | "" |
| `MG_SMPP_SYSTEM_TYPE` | SMPP system type | "" |
| `MG_SMPP_SRC_ADDR_TON` | SMPP source address TON | `0` |
| `MG_SMPP_DST_ADDR_TON` | SMPP source address NPI | `0` |
| `MG_SMPP_SRC_ADDR_NPI` | SMPP destination address TON | `0` |
| `MG_SMPP_DST_ADDR_NPI` | SMPP destination address NPI | `0` |
| Variable | Description | Default |
| ---------------------- | ---------------------------------- | ------- |
| `MG_SMPP_ADDRESS` | SMPP address in `host:port` format | "" |
| `MG_SMPP_USERNAME` | SMPP username | "" |
| `MG_SMPP_PASSWORD` | SMPP password | "" |
| `MG_SMPP_SYSTEM_TYPE` | SMPP system type | "" |
| `MG_SMPP_SRC_ADDR_TON` | SMPP source address TON | `0` |
| `MG_SMPP_DST_ADDR_TON` | SMPP source address NPI | `0` |
| `MG_SMPP_SRC_ADDR_NPI` | SMPP destination address TON | `0` |
| `MG_SMPP_DST_ADDR_NPI` | SMPP destination address NPI | `0` |
Note: The SMPP env tags are mapped exactly as defined in `consumers/notifiers/smpp/config.go`.
@@ -43,32 +43,32 @@ Note: The SMPP env tags are mapped exactly as defined in `consumers/notifiers/sm
Defined in `consumers/notifiers/smpp/README.md`.
| Variable | Description | Default |
| --- | --- | --- |
| `MG_SMPP_NOTIFIER_LOG_LEVEL` | Log level for SMPP notifier | `info` |
| `MG_SMPP_NOTIFIER_FROM_ADDRESS` | From address for SMS notifications | "" |
| `MG_SMPP_NOTIFIER_CONFIG_PATH` | Config file path for message broker subjects and payload type | `/config.toml` |
| `MG_SMPP_NOTIFIER_HTTP_HOST` | Service HTTP host | `localhost` |
| `MG_SMPP_NOTIFIER_HTTP_PORT` | Service HTTP port | `9014` |
| `MG_SMPP_NOTIFIER_HTTP_SERVER_CERT` | Service HTTP server certificate path | "" |
| `MG_SMPP_NOTIFIER_HTTP_SERVER_KEY` | Service HTTP server key path | "" |
| `MG_SMPP_NOTIFIER_DB_HOST` | Database host address | `localhost` |
| `MG_SMPP_NOTIFIER_DB_PORT` | Database host port | `5432` |
| `MG_SMPP_NOTIFIER_DB_USER` | Database user | `magistrala` |
| `MG_SMPP_NOTIFIER_DB_PASS` | Database password | `magistrala` |
| `MG_SMPP_NOTIFIER_DB_NAME` | Database name | `subscriptions` |
| `MG_SMPP_NOTIFIER_DB_SSL_MODE` | DB SSL mode (disable, require, verify-ca, verify-full) | `disable` |
| `MG_SMPP_NOTIFIER_DB_SSL_CERT` | DB SSL client cert path | "" |
| `MG_SMPP_NOTIFIER_DB_SSL_KEY` | DB SSL client key path | "" |
| `MG_SMPP_NOTIFIER_DB_SSL_ROOT_CERT` | DB SSL root cert path | "" |
| `MG_AUTH_GRPC_URL` | Auth gRPC URL | `localhost:7001` |
| `MG_AUTH_GRPC_TIMEOUT` | Auth gRPC timeout | `1s` |
| `MG_AUTH_GRPC_CLIENT_TLS` | Auth client TLS flag | `false` |
| `MG_AUTH_GRPC_CA_CERT` | Auth client CA certs path | "" |
| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://127.0.0.1:4222` |
| `MG_JAEGER_URL` | Jaeger tracing URL | `http://jaeger:14268/api/traces` |
| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` |
| `MG_SMPP_NOTIFIER_INSTANCE_ID` | SMPP notifier instance ID | "" |
| Variable | Description | Default |
| ----------------------------------- | ------------------------------------------------------------- | -------------------------------- |
| `MG_SMPP_NOTIFIER_LOG_LEVEL` | Log level for SMPP notifier | `info` |
| `MG_SMPP_NOTIFIER_FROM_ADDRESS` | From address for SMS notifications | "" |
| `MG_SMPP_NOTIFIER_CONFIG_PATH` | Config file path for message broker subjects and payload type | `/config.toml` |
| `MG_SMPP_NOTIFIER_HTTP_HOST` | Service HTTP host | `localhost` |
| `MG_SMPP_NOTIFIER_HTTP_PORT` | Service HTTP port | `9014` |
| `MG_SMPP_NOTIFIER_HTTP_SERVER_CERT` | Service HTTP server certificate path | "" |
| `MG_SMPP_NOTIFIER_HTTP_SERVER_KEY` | Service HTTP server key path | "" |
| `MG_SMPP_NOTIFIER_DB_HOST` | Database host address | `localhost` |
| `MG_SMPP_NOTIFIER_DB_PORT` | Database host port | `5432` |
| `MG_SMPP_NOTIFIER_DB_USER` | Database user | `magistrala` |
| `MG_SMPP_NOTIFIER_DB_PASS` | Database password | `magistrala` |
| `MG_SMPP_NOTIFIER_DB_NAME` | Database name | `subscriptions` |
| `MG_SMPP_NOTIFIER_DB_SSL_MODE` | DB SSL mode (disable, require, verify-ca, verify-full) | `disable` |
| `MG_SMPP_NOTIFIER_DB_SSL_CERT` | DB SSL client cert path | "" |
| `MG_SMPP_NOTIFIER_DB_SSL_KEY` | DB SSL client key path | "" |
| `MG_SMPP_NOTIFIER_DB_SSL_ROOT_CERT` | DB SSL root cert path | "" |
| `MG_AUTH_GRPC_URL` | Auth gRPC URL | `localhost:7001` |
| `MG_AUTH_GRPC_TIMEOUT` | Auth gRPC timeout | `1s` |
| `MG_AUTH_GRPC_CLIENT_TLS` | Auth client TLS flag | `false` |
| `MG_AUTH_GRPC_CA_CERT` | Auth client CA certs path | "" |
| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://127.0.0.1:4222` |
| `MG_JAEGER_URL` | Jaeger tracing URL | `http://jaeger:14268/api/traces` |
| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` |
| `MG_SMPP_NOTIFIER_INSTANCE_ID` | SMPP notifier instance ID | "" |
## Features
@@ -84,7 +84,7 @@ Defined in `consumers/notifiers/smpp/README.md`.
1. Clients register subscriptions through the HTTP API (`topic` + `contact`).
2. The service authenticates the token, assigns an owner ID, and persists the subscription.
3. When a message arrives, the service builds the topic as `channel` or `channel.subtopic`, retrieves matching subscriptions, and gathers contacts.
3. When a message arrives, the service builds the topic as `channel` or `channel/subtopic`, retrieves matching subscriptions, and gathers contacts.
4. The notifier implementation sends notifications using the configured backend.
### Components
@@ -99,12 +99,12 @@ Defined in `consumers/notifiers/smpp/README.md`.
Defined in `consumers/notifiers/postgres/init.go`:
| Column | Type | Description |
| --- | --- | --- |
| `id` | `VARCHAR(254)` | Subscription identifier (primary key) |
| `owner_id` | `VARCHAR(254)` | Owner ID derived from the auth token |
| `contact` | `VARCHAR(254)` | Notification contact (email or phone) |
| `topic` | `TEXT` | Topic to match (`channel` or `channel.subtopic`) |
| Column | Type | Description |
| ---------- | -------------- | ------------------------------------------------ |
| `id` | `VARCHAR(254)` | Subscription identifier (primary key) |
| `owner_id` | `VARCHAR(254)` | Owner ID derived from the auth token |
| `contact` | `VARCHAR(254)` | Notification contact (email or phone) |
| `topic` | `TEXT` | Topic to match (`channel` or `channel/subtopic`) |
Constraint: `UNIQUE(topic, contact)`
@@ -129,13 +129,13 @@ go test ./consumers/notifiers/...
The Notifiers service supports the following operations (see `apidocs/openapi/notifiers.yaml`):
| Operation | Method & Path | Description |
| --- | --- | --- |
| `createSubscription` | `POST /subscriptions` | Create a new subscription |
| `listSubscriptions` | `GET /subscriptions` | List subscriptions with filters |
| `viewSubscription` | `GET /subscriptions/{id}` | Retrieve a subscription |
| `removeSubscription` | `DELETE /subscriptions/{id}` | Delete a subscription |
| `health` | `GET /health` | Service health check |
| Operation | Method & Path | Description |
| -------------------- | ---------------------------- | ------------------------------- |
| `createSubscription` | `POST /subscriptions` | Create a new subscription |
| `listSubscriptions` | `GET /subscriptions` | List subscriptions with filters |
| `viewSubscription` | `GET /subscriptions/{id}` | Retrieve a subscription |
| `removeSubscription` | `DELETE /subscriptions/{id}` | Delete a subscription |
| `health` | `GET /health` | Service health check |
### Example: Create a subscription
@@ -144,7 +144,7 @@ curl -X POST http://localhost:9014/subscriptions \
-H "Authorization: Bearer <your_access_token>" \
-H "Content-Type: application/json" \
-d '{
"topic": "channel.subtopic",
"topic": "channel/subtopic",
"contact": "user@example.com"
}'
```
@@ -152,7 +152,7 @@ curl -X POST http://localhost:9014/subscriptions \
### Example: List subscriptions
```bash
curl -X GET "http://localhost:9014/subscriptions?topic=channel.subtopic&contact=user@example.com&limit=20&offset=0" \
curl -X GET "http://localhost:9014/subscriptions?topic=channel/subtopic&contact=user@example.com&limit=20&offset=0" \
-H "Authorization: Bearer <your_access_token>"
```
+14
View File
@@ -23,6 +23,20 @@ func Migration() *migrate.MemoryMigrationSource {
"DROP TABLE IF EXISTS subscriptions",
},
},
{
Id: "subscriptions_2",
Up: []string{
// Canonicalize legacy dot-delimited topics to slash-delimited topics.
`UPDATE subscriptions
SET topic = REPLACE(topic, '.', '/')
WHERE topic LIKE '%.%'`,
},
Down: []string{
`UPDATE subscriptions
SET topic = REPLACE(topic, '/', '.')
WHERE topic LIKE '%/%'`,
},
},
},
}
}
+44 -29
View File
@@ -5,12 +5,12 @@ package notifiers
import (
"context"
"fmt"
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
)
@@ -112,24 +112,11 @@ func (ns *notifierService) ConsumeBlocking(ctx context.Context, message any) err
if !ok {
return ErrMessage
}
topic := msg.GetChannel()
if msg.GetSubtopic() != "" {
topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic())
}
pm := PageMetadata{
Topic: topic,
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(ctx, pm)
to, err := ns.recipients(ctx, msg)
if err != nil {
return err
}
var to []string
for _, sub := range page.Subscriptions {
to = append(to, sub.Contact)
}
if len(to) > 0 {
err := ns.notifier.Notify(ns.from, to, msg)
if err != nil {
@@ -146,25 +133,12 @@ func (ns *notifierService) ConsumeAsync(ctx context.Context, message any) {
ns.errCh <- ErrMessage
return
}
topic := msg.GetChannel()
if msg.GetSubtopic() != "" {
topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic())
}
pm := PageMetadata{
Topic: topic,
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(ctx, pm)
to, err := ns.recipients(ctx, msg)
if err != nil {
ns.errCh <- err
return
}
var to []string
for _, sub := range page.Subscriptions {
to = append(to, sub.Contact)
}
if len(to) > 0 {
if err := ns.notifier.Notify(ns.from, to, msg); err != nil {
ns.errCh <- errors.Wrap(consumers.ErrNotify, err)
@@ -175,3 +149,44 @@ func (ns *notifierService) ConsumeAsync(ctx context.Context, message any) {
func (ns *notifierService) Errors() <-chan error {
return ns.errCh
}
func (ns *notifierService) recipients(ctx context.Context, msg *messaging.Message) ([]string, error) {
topic, ok := subscriptionTopic(msg)
if !ok {
return nil, nil
}
pm := PageMetadata{
Topic: topic,
Offset: 0,
Limit: -1,
}
page, err := ns.subs.RetrieveAll(ctx, pm)
if err != nil {
if errors.Contains(err, repoerr.ErrNotFound) {
return nil, nil
}
return nil, err
}
to := make([]string, 0, len(page.Subscriptions))
for _, sub := range page.Subscriptions {
to = append(to, sub.Contact)
}
return to, nil
}
func subscriptionTopic(msg *messaging.Message) (string, bool) {
channel := msg.GetChannel()
if channel == "" {
return "", false
}
subtopic := msg.GetSubtopic()
if subtopic == "" {
return channel, true
}
return channel + "/" + subtopic, true
}
+27 -2
View File
@@ -31,12 +31,17 @@ const (
)
func newService() (notifiers.Service, *authnmocks.Authentication, *mocks.SubscriptionsRepository) {
svc, auth, repo, _ := newServiceWithNotifier()
return svc, auth, repo
}
func newServiceWithNotifier() (notifiers.Service, *authnmocks.Authentication, *mocks.SubscriptionsRepository, *smqmocks.Notifier) {
repo := new(mocks.SubscriptionsRepository)
auth := new(authnmocks.Authentication)
notifier := new(smqmocks.Notifier)
idp := uuid.NewMock()
from := "exampleFrom"
return notifiers.New(auth, repo, idp, notifier, from), auth, repo
return notifiers.New(auth, repo, idp, notifier, from), auth, repo, notifier
}
func TestCreateSubscription(t *testing.T) {
@@ -324,7 +329,7 @@ func TestRemoveSubscription(t *testing.T) {
}
func TestConsume(t *testing.T) {
svc, _, repo := newService()
svc, _, repo, notifier := newServiceWithNotifier()
msg := messaging.Message{
Channel: "topic",
Subtopic: "subtopic",
@@ -357,4 +362,24 @@ func TestConsume(t *testing.T) {
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
repoCall.Unset()
}
canonicalMsg := &messaging.Message{
Channel: "topic",
Subtopic: "sub/topic",
}
repo.On("RetrieveAll", context.TODO(), notifiers.PageMetadata{
Topic: "topic/sub/topic",
Offset: 0,
Limit: -1,
}).Return(notifiers.Page{
Subscriptions: []notifiers.Subscription{
{Contact: "user@example.com"},
},
}, nil).Once()
notifier.On("Notify", "exampleFrom", []string{"user@example.com"}, canonicalMsg).Return(nil).Once()
err := svc.ConsumeBlocking(context.TODO(), canonicalMsg)
assert.NoError(t, err)
}
+86 -86
View File
@@ -15,66 +15,66 @@ Values shown are from [docker/.env](https://github.com/absmach/magistrala/blob/m
#### Postgres Service endpoints
| Variable | Description | Default |
| --- | --- | --- |
| `MG_POSTGRES_WRITER_LOG_LEVEL` | Service log level | `debug` |
| `MG_POSTGRES_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` |
| `MG_POSTGRES_WRITER_HTTP_HOST` | HTTP host | `postgres-writer` |
| `MG_POSTGRES_WRITER_HTTP_PORT` | HTTP port | `9007` |
| `MG_POSTGRES_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" |
| `MG_POSTGRES_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" |
| `MG_POSTGRES_WRITER_INSTANCE_ID` | Instance ID | "" |
| Variable | Description | Default |
| ------------------------------------- | ------------------------------------- | ----------------- |
| `MG_POSTGRES_WRITER_LOG_LEVEL` | Service log level | `debug` |
| `MG_POSTGRES_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` |
| `MG_POSTGRES_WRITER_HTTP_HOST` | HTTP host | `postgres-writer` |
| `MG_POSTGRES_WRITER_HTTP_PORT` | HTTP port | `9007` |
| `MG_POSTGRES_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" |
| `MG_POSTGRES_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" |
| `MG_POSTGRES_WRITER_INSTANCE_ID` | Instance ID | "" |
#### Postgres Database
| Variable | Description | Default |
| --- | --- | --- |
| `MG_POSTGRES_HOST` | PostgreSQL host | `postgres` |
| `MG_POSTGRES_PORT` | PostgreSQL port | `5432` |
| `MG_POSTGRES_USER` | PostgreSQL user | `supermq` |
| `MG_POSTGRES_PASS` | PostgreSQL password | `supermq` |
| `MG_POSTGRES_NAME` | PostgreSQL database name | `messages` |
| `MG_POSTGRES_SSL_MODE` | PostgreSQL SSL mode | `disable` |
| `MG_POSTGRES_SSL_CERT` | PostgreSQL SSL client cert | "" |
| `MG_POSTGRES_SSL_KEY` | PostgreSQL SSL client key | "" |
| `MG_POSTGRES_SSL_ROOT_CERT` | PostgreSQL SSL root cert | "" |
| Variable | Description | Default |
| --------------------------- | -------------------------- | ---------- |
| `MG_POSTGRES_HOST` | PostgreSQL host | `postgres` |
| `MG_POSTGRES_PORT` | PostgreSQL port | `5432` |
| `MG_POSTGRES_USER` | PostgreSQL user | `supermq` |
| `MG_POSTGRES_PASS` | PostgreSQL password | `supermq` |
| `MG_POSTGRES_NAME` | PostgreSQL database name | `messages` |
| `MG_POSTGRES_SSL_MODE` | PostgreSQL SSL mode | `disable` |
| `MG_POSTGRES_SSL_CERT` | PostgreSQL SSL client cert | "" |
| `MG_POSTGRES_SSL_KEY` | PostgreSQL SSL client key | "" |
| `MG_POSTGRES_SSL_ROOT_CERT` | PostgreSQL SSL root cert | "" |
#### Postgres Message broker and observability
| Variable | Description | Default |
| --- | --- | --- |
| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://nats:4222` |
| `MG_JAEGER_URL` | Jaeger collector endpoint | `http://jaeger:4318/v1/traces` |
| `MG_JAEGER_TRACE_RATIO` | Trace sampling ratio | `1.0` |
| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` |
| Variable | Description | Default |
| ----------------------- | --------------------------------------------- | ------------------------------ |
| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://nats:4222` |
| `MG_JAEGER_URL` | Jaeger collector endpoint | `http://jaeger:4318/v1/traces` |
| `MG_JAEGER_TRACE_RATIO` | Trace sampling ratio | `1.0` |
| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` |
### Timescale writer
#### Timescale Service endpoints
| Variable | Description | Default |
| --- | --- | --- |
| `MG_TIMESCALE_WRITER_LOG_LEVEL` | Service log level | `debug` |
| `MG_TIMESCALE_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` |
| `MG_TIMESCALE_WRITER_HTTP_HOST` | HTTP host | `timescale-writer` |
| `MG_TIMESCALE_WRITER_HTTP_PORT` | HTTP port | `9012` |
| `MG_TIMESCALE_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" |
| `MG_TIMESCALE_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" |
| `MG_TIMESCALE_WRITER_INSTANCE_ID` | Instance ID | "" |
| Variable | Description | Default |
| -------------------------------------- | ------------------------------------- | ------------------ |
| `MG_TIMESCALE_WRITER_LOG_LEVEL` | Service log level | `debug` |
| `MG_TIMESCALE_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` |
| `MG_TIMESCALE_WRITER_HTTP_HOST` | HTTP host | `timescale-writer` |
| `MG_TIMESCALE_WRITER_HTTP_PORT` | HTTP port | `9012` |
| `MG_TIMESCALE_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" |
| `MG_TIMESCALE_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" |
| `MG_TIMESCALE_WRITER_INSTANCE_ID` | Instance ID | "" |
#### Timescale Database
| Variable | Description | Default |
| --- | --- | --- |
| `MG_TIMESCALE_HOST` | TimescaleDB host | `timescale` |
| `MG_TIMESCALE_PORT` | TimescaleDB port | `5432` |
| `MG_TIMESCALE_USER` | TimescaleDB user | `supermq` |
| `MG_TIMESCALE_PASS` | TimescaleDB password | `supermq` |
| `MG_TIMESCALE_NAME` | TimescaleDB database name | `supermq` |
| `MG_TIMESCALE_SSL_MODE` | TimescaleDB SSL mode | `disable` |
| `MG_TIMESCALE_SSL_CERT` | TimescaleDB SSL client cert | "" |
| `MG_TIMESCALE_SSL_KEY` | TimescaleDB SSL client key | "" |
| `MG_TIMESCALE_SSL_ROOT_CERT` | TimescaleDB SSL root cert | "" |
| Variable | Description | Default |
| ---------------------------- | --------------------------- | ----------- |
| `MG_TIMESCALE_HOST` | TimescaleDB host | `timescale` |
| `MG_TIMESCALE_PORT` | TimescaleDB port | `5432` |
| `MG_TIMESCALE_USER` | TimescaleDB user | `supermq` |
| `MG_TIMESCALE_PASS` | TimescaleDB password | `supermq` |
| `MG_TIMESCALE_NAME` | TimescaleDB database name | `supermq` |
| `MG_TIMESCALE_SSL_MODE` | TimescaleDB SSL mode | `disable` |
| `MG_TIMESCALE_SSL_CERT` | TimescaleDB SSL client cert | "" |
| `MG_TIMESCALE_SSL_KEY` | TimescaleDB SSL client key | "" |
| `MG_TIMESCALE_SSL_ROOT_CERT` | TimescaleDB SSL root cert | "" |
#### Timescale Message broker and observability
@@ -91,7 +91,7 @@ The config file controls subscription topics and optional transformer settings f
```toml
["subscriber"]
topics = ["writers.>"]
topics = ["writers/#"]
[transformer]
format = "senml"
@@ -104,24 +104,24 @@ time_fields = [
]
```
The topic filter uses `writers.*` syntax in the config file for both backends. Writers do not expose broker mode, delivery policy, or consumer-group settings in this file. They always consume through the stream-backed broker adapter in `consumers/writers/brokers`:
The topic filter uses slash-delimited MQTT-style syntax (`+`, `#`) in the config file for both backends. Writers do not expose broker mode, delivery policy, or consumer-group settings in this file. They always consume through the stream-backed broker adapter in `consumers/writers/brokers`:
- NATS builds use JetStream streams with durable consumers.
- FluxMQ builds publish to and consume from the `writers` stream queue while preserving the same `writers.>` config syntax.
- FluxMQ builds publish to and consume from the `writers` stream queue while preserving the same `writers/#` config syntax.
## Features
- **Message persistence**: Stores incoming SenML messages into PostgreSQL or TimescaleDB.
- **JSON payload support**: Saves JSON payloads into dynamically created tables.
- **Stream-backed ingestion**: Consumes through NATS JetStream durable consumers or FluxMQ stream queues.
- **Configurable subscription**: Limits ingestion to specific `writers.*` topics.
- **Configurable subscription**: Limits ingestion to specific `writers/<channel>/<subtopic>` topics.
- **Observability**: Exposes `/health` and `/metrics` endpoints, with Jaeger tracing.
## Architecture
### Runtime flow
1. The rules engine publishes writer messages under `writers.*`.
1. The rules engine publishes writer messages under `writers/<channel>/<subtopic>`.
2. The writer loads `config.toml` to select topic filters and transformer settings.
3. The broker adapter consumes from the underlying stream-backed implementation.
4. The consumer converts messages to SenML or JSON payloads.
@@ -138,22 +138,22 @@ The topic filter uses `writers.*` syntax in the config file for both backends. W
Defined in `consumers/writers/postgres/init.go`:
| Column | Type | Description |
| --- | --- | --- |
| `id` | `UUID` | Message ID |
| `channel` | `UUID` | Channel ID |
| `subtopic` | `VARCHAR(254)` | Subtopic |
| `publisher` | `UUID` | Publisher ID |
| `protocol` | `TEXT` | Protocol name |
| `name` | `TEXT` | SenML name |
| `unit` | `TEXT` | SenML unit |
| `value` | `FLOAT` | Numeric value |
| `string_value` | `TEXT` | String value |
| `bool_value` | `BOOL` | Boolean value |
| `data_value` | `BYTEA` | Data value |
| `sum` | `FLOAT` | Sum value |
| `time` | `FLOAT` | Measurement time |
| `update_time` | `FLOAT` | Update time |
| Column | Type | Description |
| -------------- | -------------- | ---------------- |
| `id` | `UUID` | Message ID |
| `channel` | `UUID` | Channel ID |
| `subtopic` | `VARCHAR(254)` | Subtopic |
| `publisher` | `UUID` | Publisher ID |
| `protocol` | `TEXT` | Protocol name |
| `name` | `TEXT` | SenML name |
| `unit` | `TEXT` | SenML unit |
| `value` | `FLOAT` | Numeric value |
| `string_value` | `TEXT` | String value |
| `bool_value` | `BOOL` | Boolean value |
| `data_value` | `BYTEA` | Data value |
| `sum` | `FLOAT` | Sum value |
| `time` | `FLOAT` | Measurement time |
| `update_time` | `FLOAT` | Update time |
Primary key: `(time, publisher, subtopic, name)`
@@ -161,21 +161,21 @@ Primary key: `(time, publisher, subtopic, name)`
Defined in `consumers/writers/timescale/init.go`:
| Column | Type | Description |
| --- | --- | --- |
| `time` | `BIGINT` | Measurement time |
| `channel` | `UUID` | Channel ID |
| `subtopic` | `VARCHAR(254)` | Subtopic |
| `publisher` | `VARCHAR(254)` | Publisher ID |
| `protocol` | `TEXT` | Protocol name |
| `name` | `VARCHAR(254)` | SenML name |
| `unit` | `TEXT` | SenML unit |
| `value` | `FLOAT` | Numeric value |
| `string_value` | `TEXT` | String value |
| `bool_value` | `BOOL` | Boolean value |
| `data_value` | `BYTEA` | Data value |
| `sum` | `FLOAT` | Sum value |
| `update_time` | `FLOAT` | Update time |
| Column | Type | Description |
| -------------- | -------------- | ---------------- |
| `time` | `BIGINT` | Measurement time |
| `channel` | `UUID` | Channel ID |
| `subtopic` | `VARCHAR(254)` | Subtopic |
| `publisher` | `VARCHAR(254)` | Publisher ID |
| `protocol` | `TEXT` | Protocol name |
| `name` | `VARCHAR(254)` | SenML name |
| `unit` | `TEXT` | SenML unit |
| `value` | `FLOAT` | Numeric value |
| `string_value` | `TEXT` | String value |
| `bool_value` | `BOOL` | Boolean value |
| `data_value` | `BYTEA` | Data value |
| `sum` | `FLOAT` | Sum value |
| `update_time` | `FLOAT` | Update time |
Primary key: `(time, channel, subtopic, protocol, publisher, name)`
@@ -262,10 +262,10 @@ go test ./consumers/writers/...
Writers do not expose a message ingestion API. Messages are written via the message broker, and writers consume them through the stream-backed broker adapter. The HTTP API provides only health and metrics endpoints.
| Endpoint | Description |
| --- | --- |
| `GET /health` | Service health check |
| `GET /metrics` | Prometheus metrics |
| Endpoint | Description |
| -------------- | -------------------- |
| `GET /health` | Service health check |
| `GET /metrics` | Prometheus metrics |
For an in-depth explanation of Writers, see the [official documentation][doc].
+1 -1
View File
@@ -17,7 +17,7 @@ import (
)
const (
AllTopic = "writers.>"
AllTopic = "writers/#"
prefix = "writers"
)
+1 -1
View File
@@ -281,7 +281,7 @@ MG_GROUPS_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}
MG_CLIENTS_LOG_LEVEL=debug
MG_CLIENTS_STANDALONE_ID=
MG_CLIENTS_STANDALONE_TOKEN=
MG_CLIENTS_CACHE_KEY_DURATION=10m
MG_CLIENTS_CACHE_KEY_DURATION=1h
MG_CLIENTS_HTTP_HOST=clients
MG_CLIENTS_HTTP_PORT=9006
MG_CLIENTS_GRPC_HOST=clients
+6 -6
View File
@@ -2,13 +2,13 @@
# SPDX-License-Identifier: Apache-2.0
# Writers consume through the broker's stream-backed path; this file only
# selects topic filters. Use NATS-style filters here even when built with
# FluxMQ.
# To listen on all writer topics use the default value "writers.>".
# To subscribe to specific topics use values starting with "writers." and
# followed by a subtopic (e.g. ["writers.<channel_id>.sub.topic.x", ...]).
# selects topic filters. Use slash-delimited MQTT-style filters (`+`, `#`)
# for both NATS and FluxMQ builds.
# To listen on all writer topics use the default value "writers/#".
# To subscribe to specific topics use values starting with "writers/" and
# followed by a subtopic (e.g. ["writers/<channel_id>/sub/topic/x", ...]).
["subscriber"]
topics = ["writers.>"]
topics = ["writers/#"]
[transformer]
# SenML or JSON
+6 -6
View File
@@ -2,10 +2,10 @@
# SPDX-License-Identifier: Apache-2.0
# Writers consume through the broker's stream-backed path; this file only
# selects topic filters. Use NATS-style filters here even when built with
# FluxMQ.
# To listen on all writer topics use the default value "writers.>".
# To subscribe to specific topics use values starting with "writers." and
# followed by a subtopic (e.g. ["writers.<channel_id>.sub.topic.x", ...]).
# selects topic filters. Use slash-delimited MQTT-style filters (`+`, `#`)
# for both NATS and FluxMQ builds.
# To listen on all writer topics use the default value "writers/#".
# To subscribe to specific topics use values starting with "writers/" and
# followed by a subtopic (e.g. ["writers/<channel_id>/sub/topic/x", ...]).
["subscriber"]
topics = ["writers.>"]
topics = ["writers/#"]
+1 -1
View File
@@ -122,7 +122,7 @@ queues:
auth:
url: "http://fluxmq-auth:7016"
transport: "grpc"
timeout: 5s
timeout: 15s
protocols:
mqtt: true
http: true
+1 -1
View File
@@ -119,7 +119,7 @@ queues:
auth:
url: "http://fluxmq-auth:7016"
transport: "grpc"
timeout: 5s
timeout: 15s
protocols:
mqtt: true
http: true
+1 -1
View File
@@ -119,7 +119,7 @@ queues:
auth:
url: "http://fluxmq-auth:7016"
transport: "grpc"
timeout: 5s
timeout: 15s
protocols:
mqtt: true
http: true
+17 -6
View File
@@ -6,13 +6,13 @@ package journal
import (
"context"
"fmt"
"strings"
"time"
"github.com/absmach/supermq"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
)
const (
@@ -28,6 +28,7 @@ const (
var (
errSaveJournal = errors.New("failed to save journal")
errHandleTelemetry = errors.New("failed to handle client telemetry")
errInvalidSubTopic = errors.New("invalid subscribe topic")
)
type service struct {
@@ -139,10 +140,9 @@ func (svc *service) addSubscription(ctx context.Context, journal Journal) error
if err != nil {
return err
}
var subtopic string
topics := strings.Split(ae.topic, ".")
if len(topics) > 2 {
subtopic = topics[2]
channelID, subtopic, err := parseSubscriptionTopic(ae.topic)
if err != nil {
return err
}
id, err := svc.idProvider.ID()
@@ -153,7 +153,7 @@ func (svc *service) addSubscription(ctx context.Context, journal Journal) error
sub := ClientSubscription{
ID: id,
SubscriberID: ae.subscriberID,
ChannelID: topics[1],
ChannelID: channelID,
Subtopic: subtopic,
ClientID: ae.clientID,
}
@@ -161,6 +161,17 @@ func (svc *service) addSubscription(ctx context.Context, journal Journal) error
return svc.repository.AddSubscription(ctx, sub)
}
func parseSubscriptionTopic(topic string) (string, string, error) {
_, channelID, subtopic, _, err := messaging.ParseSubscribeTopic(topic)
if err != nil {
return "", "", errors.Wrap(errInvalidSubTopic, err)
}
if channelID == "" {
return "", "", errInvalidSubTopic
}
return channelID, subtopic, nil
}
func (svc *service) addMqttSubscription(ctx context.Context, journal Journal) error {
ae, err := toMqttSubscribeEvent(journal)
if err != nil {
+1 -1
View File
@@ -16,7 +16,7 @@ import (
)
// StreamAllEvents represents subject to subscribe for all the events.
const StreamAllEvents = "events.>"
const StreamAllEvents = "events/#"
func init() {
log.Println("The binary was build using Nats as the events store")
+3 -3
View File
@@ -1,8 +1,8 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
//go:build msg_fluxmq
// +build msg_fluxmq
//go:build !msg_nats
// +build !msg_nats
package brokers
@@ -16,7 +16,7 @@ import (
)
// SubjectAllMessages represents subject to subscribe for all the messages.
const SubjectAllMessages = string(messaging.MsgTopicPrefix) + ".>"
const SubjectAllMessages = string(messaging.MsgTopicPrefix) + "/#"
func init() {
log.Println("The binary was built using FluxMQ as the message broker")
+3 -3
View File
@@ -1,8 +1,8 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
//go:build !msg_fluxmq && !msg_rabbitmq && !rabbitmq
// +build !msg_fluxmq,!msg_rabbitmq,!rabbitmq
//go:build msg_nats
// +build msg_nats
package brokers
@@ -16,7 +16,7 @@ import (
)
// SubjectAllMessages represents subject to subscribe for all the messages.
const SubjectAllMessages = string(messaging.MsgTopicPrefix) + ".>"
const SubjectAllMessages = string(messaging.MsgTopicPrefix) + "/#"
func init() {
log.Println("The binary was built using NATS as the message broker")
+9 -2
View File
@@ -5,6 +5,7 @@ package fluxmq
import (
"errors"
"strings"
"github.com/absmach/supermq/pkg/messaging"
"github.com/nats-io/nats.go/jetstream"
@@ -32,9 +33,15 @@ func Prefix(prefix string) messaging.Option {
return func(val any) error {
switch v := val.(type) {
case *publisher:
v.prefix = prefix
v.prefix = strings.TrimSpace(prefix)
if v.prefix == "" {
v.prefix = msgPrefix
}
case *pubsub:
v.prefix = prefix
v.prefix = strings.TrimSpace(prefix)
if v.prefix == "" {
v.prefix = msgPrefix
}
default:
return ErrInvalidType
}
+1 -3
View File
@@ -95,7 +95,7 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e
group := formatConsumerName(cfg.Topic, cfg.ID)
opts := &fluxamqp.StreamConsumeOptions{
QueueName: streamQueue(ps.prefix),
QueueName: ps.prefix,
Filter: streamFilter(ps.prefix, cfg.Topic),
ConsumerGroup: group,
}
@@ -118,7 +118,6 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e
sub := subscription{
streamTopic: queueFilter(ps.prefix, cfg.Topic),
}
if ps.directTopicIngress {
// Subscribe to regular MQTT topics so that messages published directly
// by MQTT clients (not through the stream queue) are also received.
@@ -137,7 +136,6 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e
ps.mu.Lock()
ps.subscriptions[subscriptionKey(cfg.ID, cfg.Topic)] = sub
ps.mu.Unlock()
return nil
}
+64 -82
View File
@@ -13,113 +13,88 @@ import (
const queuePrefix = "$queue/"
var (
topicReplacer = strings.NewReplacer(".", "/", "*", "+", ">", "#")
nameReplacer = strings.NewReplacer(
" ", "_",
".", "_",
"*", "_",
">", "_",
"/", "_",
"\\", "_",
)
var nameReplacer = strings.NewReplacer(
" ", "_",
".", "_",
"*", "_",
">", "_",
"/", "_",
"\\", "_",
"+", "_",
"#", "_",
)
func canonicalPrefix(prefix string) string {
prefix = strings.TrimSpace(prefix)
if prefix == "" {
return msgPrefix
}
return prefix
}
func streamQueue(prefix string) string {
return canonicalPrefix(prefix)
}
func brokerPath(topic string) string {
topic = strings.TrimSpace(topic)
topic = strings.TrimPrefix(topic, ".")
if topic == "" {
return ""
}
return topicReplacer.Replace(topic)
}
func streamFilter(prefix, topic string) string {
path := filterPath(prefix, topic)
if path == "" {
return "#"
}
return path
}
func queueFilter(prefix, topic string) string {
queue := streamQueue(prefix)
path := streamFilter(prefix, topic)
if path == "#" {
return queuePrefix + queue + "/#"
}
return queuePrefix + queue + "/" + path
}
func queueTopic(prefix, topic string) string {
queue := streamQueue(prefix)
path := brokerPath(topic)
if path == "" {
return queuePrefix + queue
}
return queuePrefix + queue + "/" + path
}
func filterPath(prefix, topic string) string {
topic = strings.TrimSpace(topic)
if topic == "" || topic == ">" {
if topic == "" || topic == "#" {
return "#"
}
prefix = canonicalPrefix(prefix)
switch {
case topic == prefix:
topic = ">"
case strings.HasPrefix(topic, prefix+"."):
topic = strings.TrimPrefix(topic, prefix+".")
return "#"
case strings.HasPrefix(topic, prefix+"/"):
topic = strings.TrimPrefix(topic, prefix+"/")
}
return brokerPath(topic)
}
topic = strings.TrimPrefix(strings.TrimSpace(topic), "/")
if topic == "" {
return "#"
}
func formatConsumerName(topic, id string) string {
// Consumer group names must avoid whitespace and wildcard/path separators.
topic = nameReplacer.Replace(topic)
id = nameReplacer.Replace(id)
return fmt.Sprintf("%s-%s", topic, id)
return topic
}
// topicFilter returns the MQTT topic filter for subscribing to regular
// (non-queued) messages. It converts a NATS-style topic to MQTT format
// with the prefix prepended.
// For example, with prefix "m" and topic "m.>", it returns "m/#".
// (non-queued) messages. It strips the prefix and re-prepends it to
// normalize the filter.
func topicFilter(prefix, topic string) string {
prefix = canonicalPrefix(prefix)
path := filterPath(prefix, topic)
if path == "" || path == "#" {
topic = strings.TrimSpace(topic)
if topic == "" || topic == "#" {
return prefix + "/#"
}
return prefix + "/" + path
switch {
case topic == prefix:
return prefix + "/#"
case strings.HasPrefix(topic, prefix+"/"):
topic = strings.TrimPrefix(topic, prefix+"/")
}
topic = strings.TrimPrefix(strings.TrimSpace(topic), "/")
if topic == "" || topic == "#" {
return prefix + "/#"
}
return prefix + "/" + topic
}
func queueFilter(prefix, topic string) string {
path := streamFilter(prefix, topic)
if path == "#" {
return queuePrefix + prefix + "/#"
}
return queuePrefix + prefix + "/" + path
}
func queueTopic(prefix, topic string) string {
topic = strings.TrimPrefix(strings.TrimSpace(topic), "/")
if topic == "" {
return queuePrefix + prefix
}
return queuePrefix + prefix + "/" + topic
}
func parseMQTTTopic(prefix, topic string) (domainID, channelID, subtopic string, err error) {
topic = strings.TrimPrefix(strings.TrimSpace(topic), "/")
prefix = canonicalPrefix(prefix)
if !strings.HasPrefix(topic, prefix+"/") {
return "", "", "", messaging.ErrMalformedTopic
}
normalized := "/" + msgPrefix + "/" + strings.TrimPrefix(topic, prefix+"/")
// Replace the broker-specific prefix with the canonical message prefix
// so ParseSubscribeTopic can parse it.
normalized := msgPrefix + "/" + strings.TrimPrefix(topic, prefix+"/")
domainID, channelID, subtopic, _, err = messaging.ParseSubscribeTopic(normalized)
if err != nil {
@@ -149,8 +124,15 @@ func stringHeader(headers map[string]any, key string) string {
func declareStream(client *fluxamqp.Client, prefix string) error {
_, err := client.DeclareStreamQueue(&fluxamqp.StreamQueueOptions{
Name: streamQueue(prefix),
Name: prefix,
Durable: true,
})
return err
}
func formatConsumerName(topic, id string) string {
// Consumer group names must avoid whitespace and wildcard/path separators.
topic = nameReplacer.Replace(topic)
id = nameReplacer.Replace(id)
return fmt.Sprintf("%s-%s", topic, id)
}
+14 -14
View File
@@ -9,7 +9,7 @@ import (
)
func TestQueueTopic(t *testing.T) {
got := queueTopic("m", "domain.c.channel.subtopic")
got := queueTopic("m", "domain/c/channel/subtopic")
want := "$queue/m/domain/c/channel/subtopic"
if got != want {
t.Fatalf("queue topic mismatch: got %q, want %q", got, want)
@@ -26,25 +26,25 @@ func TestStreamFilter(t *testing.T) {
{
name: "all messages with prefix",
prefix: "m",
topic: "m.>",
topic: "m/#",
want: "#",
},
{
name: "all messages without explicit prefix",
prefix: "writers",
topic: ">",
topic: "#",
want: "#",
},
{
name: "specific topic filter",
prefix: "writers",
topic: "writers.domain.c.channel.*",
topic: "writers/domain/c/channel/+",
want: "domain/c/channel/+",
},
{
name: "topic without prefix",
prefix: "alarms",
topic: "domain.c.channel.>",
topic: "domain/c/channel/#",
want: "domain/c/channel/#",
},
}
@@ -60,7 +60,7 @@ func TestStreamFilter(t *testing.T) {
}
func TestQueueFilter(t *testing.T) {
got := queueFilter("writers", "writers.>")
got := queueFilter("writers", "writers/#")
want := "$queue/writers/#"
if got != want {
t.Fatalf("queue filter mismatch: got %q, want %q", got, want)
@@ -77,25 +77,25 @@ func TestTopicFilter(t *testing.T) {
{
name: "all messages with prefix",
prefix: "m",
topic: "m.>",
topic: "m/#",
want: "m/#",
},
{
name: "wildcard topic",
prefix: "writers",
topic: ">",
topic: "#",
want: "writers/#",
},
{
name: "specific topic",
prefix: "m",
topic: "m.domain.c.channel.subtopic",
topic: "m/domain/c/channel/subtopic",
want: "m/domain/c/channel/subtopic",
},
{
name: "single-level wildcard",
prefix: "m",
topic: "m.domain.c.*.subtopic",
topic: "m/domain/c/+/subtopic",
want: "m/domain/c/+/subtopic",
},
}
@@ -126,7 +126,7 @@ func TestParseMQTTTopic(t *testing.T) {
topic: "m/domain/c/channel/sub/topic",
domain: "domain",
channel: "channel",
subtopic: "sub.topic",
subtopic: "sub/topic",
},
{
name: "alternate prefix without subtopic",
@@ -142,7 +142,7 @@ func TestParseMQTTTopic(t *testing.T) {
topic: "/alarms/domain/c/channel/critical/high",
domain: "domain",
channel: "channel",
subtopic: "critical.high",
subtopic: "critical/high",
},
{
name: "mismatched prefix",
@@ -223,7 +223,7 @@ func TestParseMQTTTopicFromStreamRoutingKey(t *testing.T) {
prefix: "alarms",
domain: "dom",
channel: "ch",
subtopic: "critical.high",
subtopic: "critical/high",
},
}
for _, tc := range cases {
@@ -264,7 +264,7 @@ func TestStringHeader(t *testing.T) {
}
func TestFormatConsumerName(t *testing.T) {
got := formatConsumerName("m.domain.c.channel.>", "re/service 1")
got := formatConsumerName("m/domain/c/channel/#", "re/service 1")
want := "m_domain_c_channel__-re_service_1"
if got != want {
t.Fatalf("consumer name mismatch: got %q, want %q", got, want)
+1 -1
View File
@@ -73,7 +73,7 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.
return err
}
subject := fmt.Sprintf("%s.%s", pub.prefix, topic)
subject := fmt.Sprintf("%s.%s", pub.prefix, toNATSTopic(topic))
if _, err = pub.js.Publish(ctx, subject, data); err != nil {
return err
}
+12 -1
View File
@@ -83,12 +83,13 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
// nolint:contextcheck
nh := ps.natsHandler(cfg.Handler)
natsTopic := toNATSTopic(cfg.Topic)
consumerConfig := jetstream.ConsumerConfig{
Name: formatConsumerName(cfg.Topic, cfg.ID),
Durable: formatConsumerName(cfg.Topic, cfg.ID),
Description: fmt.Sprintf("SuperMQ consumer of id %s for cfg.Topic %s", cfg.ID, cfg.Topic),
DeliverPolicy: jetstream.DeliverNewPolicy,
FilterSubject: cfg.Topic,
FilterSubject: natsTopic,
}
if cfg.Ordered {
@@ -205,6 +206,14 @@ func (ps *pubsub) handleAck(at messaging.AckType, m jetstream.Msg) {
}
}
// toNATSTopic converts a canonical /-separated topic with MQTT wildcards
// to NATS format using . separators and NATS wildcards.
var natsTopicReplacer = strings.NewReplacer("/", ".", "+", "*", "#", ">")
func toNATSTopic(topic string) string {
return natsTopicReplacer.Replace(topic)
}
func formatConsumerName(topic, id string) string {
// A durable name cannot contain whitespace, ., *, >, path separators (forward or backwards slash), and non-printable characters.
chars := []string{
@@ -214,6 +223,8 @@ func formatConsumerName(topic, id string) string {
">", "_",
"/", "_",
"\\", "_",
"+", "_",
"#", "_",
}
topic = strings.NewReplacer(chars...).Replace(topic)
+16 -16
View File
@@ -37,7 +37,7 @@ var (
func TestPublisher(t *testing.T) {
subCfg := messaging.SubscriberConfig{
ID: clientID,
Topic: fmt.Sprintf("%s.>", msgPrefix),
Topic: fmt.Sprintf("%s/#", msgPrefix),
Handler: handler{},
}
err := pubsub.Subscribe(context.TODO(), subCfg)
@@ -116,7 +116,7 @@ func TestPubsub(t *testing.T) {
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -124,7 +124,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe using malformed topic and ID",
topic: fmt.Sprintf("%s.>", msgPrefix),
topic: fmt.Sprintf("%s/#", msgPrefix),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -132,7 +132,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe using malformed topic and ID",
topic: fmt.Sprintf("%s.*", msgPrefix),
topic: fmt.Sprintf("%s/+", msgPrefix),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -140,7 +140,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "clientid2",
errorMessage: nil,
pubsub: true,
@@ -148,7 +148,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
@@ -156,7 +156,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nil,
pubsub: false,
@@ -172,7 +172,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "clientidd2",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -180,7 +180,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from the same topic with a different ID not subscribed",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "clientidd3",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -188,7 +188,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -196,7 +196,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
@@ -204,7 +204,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
@@ -212,7 +212,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nil,
pubsub: false,
@@ -220,7 +220,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic),
topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic),
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
@@ -244,7 +244,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Subscribe to a topic with empty id",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: true,
@@ -252,7 +252,7 @@ func TestPubsub(t *testing.T) {
},
{
desc: "Unsubscribe from a topic with empty id",
topic: fmt.Sprintf("%s.%s", msgPrefix, topic),
topic: fmt.Sprintf("%s/%s", msgPrefix, topic),
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: false,
+16 -24
View File
@@ -23,11 +23,11 @@ const (
)
var (
mqWildcards = "+#"
wildcards = "*>"
subtopicInvalidChars = " #+"
wildcardsReplacer = strings.NewReplacer("+", "*", "#", ">")
pathReplacer = strings.NewReplacer("/", ".")
// MQTT wildcards are the canonical wildcard characters.
mqttWildcards = "+#"
natsWildcards = "*>"
subtopicInvalidChars = " "
subtopicSep = "/"
DefaultCacheConfig = CacheConfig{
NumCounters: 2e5, // 200k
@@ -269,11 +269,11 @@ func ParsePublishSubtopic(subtopic string) (parseSubTopic string, err error) {
return "", errors.Wrap(ErrMalformedSubtopic, err)
}
if strings.ContainsAny(subtopic, subtopicInvalidChars+wildcards) {
if strings.ContainsAny(subtopic, subtopicInvalidChars+mqttWildcards+natsWildcards) {
return "", ErrMalformedSubtopic
}
if strings.Contains(subtopic, "..") {
if strings.Contains(subtopic, "//") {
return "", ErrMalformedSubtopic
}
@@ -298,24 +298,21 @@ func ParseSubscribeSubtopic(subtopic string) (parseSubTopic string, err error) {
return "", nil
}
if strings.ContainsAny(subtopic, mqWildcards) {
subtopic = wildcardsReplacer.Replace(subtopic)
}
subtopic, err = formatSubtopic(subtopic)
if err != nil {
return "", errors.Wrap(ErrMalformedSubtopic, err)
}
if strings.ContainsAny(subtopic, subtopicInvalidChars) {
if strings.ContainsAny(subtopic, subtopicInvalidChars+natsWildcards) {
return "", ErrMalformedSubtopic
}
if strings.Contains(subtopic, "..") {
if strings.Contains(subtopic, "//") {
return "", ErrMalformedSubtopic
}
for _, elem := range strings.Split(subtopic, ".") {
if len(elem) > 1 && strings.ContainsAny(elem, wildcards) {
for _, elem := range strings.Split(subtopic, subtopicSep) {
if len(elem) > 1 && strings.ContainsAny(elem, mqttWildcards) {
return "", ErrMalformedSubtopic
}
}
@@ -323,25 +320,24 @@ func ParseSubscribeSubtopic(subtopic string) (parseSubTopic string, err error) {
}
func formatSubtopic(subtopic string) (string, error) {
subtopic, err := url.QueryUnescape(subtopic)
subtopic, err := url.PathUnescape(subtopic)
if err != nil {
return "", err
}
subtopic = strings.TrimPrefix(subtopic, "/")
subtopic = strings.TrimSuffix(subtopic, "/")
subtopic = strings.TrimSpace(subtopic)
subtopic = pathReplacer.Replace(subtopic)
return subtopic, nil
}
func EncodeTopic(domainID string, channelID string, subtopic string) string {
return fmt.Sprintf("%s.%s", string(MsgTopicPrefix), EncodeTopicSuffix(domainID, channelID, subtopic))
return fmt.Sprintf("%s/%s", string(MsgTopicPrefix), EncodeTopicSuffix(domainID, channelID, subtopic))
}
func EncodeTopicSuffix(domainID string, channelID string, subtopic string) string {
subject := fmt.Sprintf("%s.%s.%s", domainID, string(ChannelTopicPrefix), channelID)
subject := fmt.Sprintf("%s/%s/%s", domainID, string(ChannelTopicPrefix), channelID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
subject = fmt.Sprintf("%s/%s", subject, subtopic)
}
return subject
}
@@ -351,11 +347,7 @@ func EncodeMessageTopic(m *Message) string {
}
func EncodeMessageMQTTTopic(m *Message) string {
topic := fmt.Sprintf("%s/%s/%s/%s", string(MsgTopicPrefix), m.GetDomain(), string(ChannelTopicPrefix), m.GetChannel())
if m.GetSubtopic() != "" {
topic = topic + "/" + strings.ReplaceAll(m.GetSubtopic(), ".", "/")
}
return topic
return EncodeTopic(m.GetDomain(), m.GetChannel(), m.GetSubtopic())
}
func encodeAdapterTopic(domain, channel, subtopic string, topicType TopicType) string {
+19 -19
View File
@@ -65,7 +65,7 @@ var ParsePublisherTopicTestCases = []struct {
topic: "/m/domain123/c/channel456/devices/temp",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp",
subtopic: "devices/temp",
topicType: messaging.MessageType,
err: nil,
},
@@ -74,7 +74,7 @@ var ParsePublisherTopicTestCases = []struct {
topic: "/m/domain123/c/channel456/devices%2Ftemp%2Fdata",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp.data",
subtopic: "devices/temp/data",
topicType: messaging.MessageType,
},
{
@@ -82,7 +82,7 @@ var ParsePublisherTopicTestCases = []struct {
topic: "/m/domain/c/channel/extra/extra2",
domainID: "domain",
channelID: "channel",
subtopic: "extra.extra2",
subtopic: "extra/extra2",
topicType: messaging.MessageType,
},
{
@@ -98,7 +98,7 @@ var ParsePublisherTopicTestCases = []struct {
topic: "/m/domain123/c/channel456/devices/temp/",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp",
subtopic: "devices/temp",
topicType: messaging.MessageType,
},
{
@@ -253,7 +253,7 @@ var ParseSubscribeTestCases = []struct {
topic: "/m/domain123/c/channel456/devices/temp",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp",
subtopic: "devices/temp",
topicType: messaging.MessageType,
},
{
@@ -261,7 +261,7 @@ var ParseSubscribeTestCases = []struct {
topic: "/m/domain123/c/channel456/devices/+/temp/#",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.*.temp.>",
subtopic: "devices/+/temp/#",
topicType: messaging.MessageType,
},
{
@@ -277,7 +277,7 @@ var ParseSubscribeTestCases = []struct {
topic: "/m/domain123/c/channel456/devices/temp/",
domainID: "domain123",
channelID: "channel456",
subtopic: "devices.temp",
subtopic: "devices/temp",
topicType: messaging.MessageType,
},
{
@@ -323,11 +323,11 @@ var ParseSubscribeTestCases = []struct {
err: messaging.ErrMalformedTopic,
},
{
desc: "invalid domain name m/domain*123/c/channel456/devices/+/temp/#",
desc: "valid domain with wildcards m/domain*123/c/channel456/devices/+/temp/#",
topic: "m/domain*123/c/channel456/devices/+/temp/#",
domainID: "domain*123",
channelID: "channel456",
subtopic: "devices.*.temp.>",
subtopic: "devices/+/temp/#",
topicType: messaging.MessageType,
},
{
@@ -430,15 +430,15 @@ func TestEncodeTopic(t *testing.T) {
desc: "with subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "dev.sensor.temp",
expected: "m.domain1.c.chan1.dev.sensor.temp",
subtopic: "dev/sensor/temp",
expected: "m/domain1/c/chan1/dev/sensor/temp",
},
{
desc: "without subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "",
expected: "m.domain1.c.chan1",
expected: "m/domain1/c/chan1",
},
}
@@ -462,15 +462,15 @@ func TestEncodeTopicSuffix(t *testing.T) {
desc: "with subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "dev.sensor.temp",
expected: "domain1.c.chan1.dev.sensor.temp",
subtopic: "dev/sensor/temp",
expected: "domain1/c/chan1/dev/sensor/temp",
},
{
desc: "without subtopic",
domainID: "domain1",
channelID: "chan1",
subtopic: "",
expected: "domain1.c.chan1",
expected: "domain1/c/chan1",
},
}
@@ -493,9 +493,9 @@ func TestMessage_EncodeTopicSuffix(t *testing.T) {
message: &messaging.Message{
Domain: "domainX",
Channel: "chanX",
Subtopic: "device.123.status",
Subtopic: "device/123/status",
},
expected: "domainX.c.chanX.device.123.status",
expected: "domainX/c/chanX/device/123/status",
},
{
desc: "without subtopic",
@@ -503,7 +503,7 @@ func TestMessage_EncodeTopicSuffix(t *testing.T) {
Domain: "domainY",
Channel: "chanY",
},
expected: "domainY.c.chanY",
expected: "domainY/c/chanY",
},
}
@@ -526,7 +526,7 @@ func TestMessage_EncodeToMQTTTopic(t *testing.T) {
message: &messaging.Message{
Domain: "domainA",
Channel: "chanA",
Subtopic: "dev.1.temp",
Subtopic: "dev/1/temp",
},
expected: "m/domainA/c/chanA/dev/1/temp",
},
+4 -5
View File
@@ -24,13 +24,12 @@ type publishRequest struct {
}
func (sdk mgSDK) SendMessage(ctx context.Context, domainID, topic, msg, secret string) errors.SDKError {
chanNameParts := strings.SplitN(topic, ".", channelParts)
chanNameParts := strings.SplitN(topic, "/", channelParts)
chanID := chanNameParts[0]
brokerTopic := fmt.Sprintf("m/%s/c/%s", domainID, chanID)
if len(chanNameParts) == channelParts {
brokerTopic = fmt.Sprintf("%s/%s", brokerTopic, strings.ReplaceAll(chanNameParts[1], ".", "/"))
brokerTopic = fmt.Sprintf("%s/%s", brokerTopic, chanNameParts[1])
}
data, err := json.Marshal(publishRequest{
Topic: brokerTopic,
Payload: []byte(msg),
@@ -40,11 +39,11 @@ func (sdk mgSDK) SendMessage(ctx context.Context, domainID, topic, msg, secret s
}
headers := map[string]string{
"X-FluxMQ-Password": secret,
"X-FluxMQ-Username": domainID,
}
reqURL := fmt.Sprintf("%s/publish", sdk.httpAdapterURL)
_, _, sdkErr := sdk.processRequest(ctx, http.MethodPost, reqURL, "", data, headers, http.StatusOK)
_, _, sdkErr := sdk.processRequest(ctx, http.MethodPost, reqURL, secret, data, headers, http.StatusOK)
return sdkErr
}
+4 -3
View File
@@ -29,8 +29,9 @@ func setupFluxMQ(secret string, expectedTopic ...string) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc("POST /publish", func(w http.ResponseWriter, r *http.Request) {
password := r.Header.Get("X-FluxMQ-Password")
if password == "" || password != secret {
username := r.Header.Get("X-FluxMQ-Username")
auth := r.Header.Get("Authorization")
if username == "" || auth == "" || auth != "Bearer "+secret {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
@@ -94,7 +95,7 @@ func TestSendMessage(t *testing.T) {
},
{
desc: "publish message with subtopic",
topic: "channelID.sub.topic",
topic: "channelID/sub/topic",
domainID: "domainID",
wantTopic: "m/domainID/c/channelID/sub/topic",
msg: `[{"n":"current","t":-1,"v":1.6}]`,
+1 -1
View File
@@ -16,7 +16,7 @@ import (
)
func (sdk mgSDK) ReadMessages(ctx context.Context, pm MessagePageMetadata, chanName, domainID, token string) (MessagesPage, errors.SDKError) {
chanNameParts := strings.SplitN(chanName, ".", channelParts)
chanNameParts := strings.SplitN(chanName, "/", channelParts)
chanID := chanNameParts[0]
subtopicPart := ""
if len(chanNameParts) == channelParts {
+1 -1
View File
@@ -100,7 +100,7 @@ func TestReadMessages(t *testing.T) {
{
desc: "read messages successfully with subtopic",
token: validToken,
chanName: channelID + ".subtopic",
chanName: channelID + "/subtopic",
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
+1 -1
View File
@@ -1262,7 +1262,7 @@ type SDK interface {
// example:
// ctx := context.Background()
// msg := '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]'
// err := sdk.SendMessage(ctx, "domainID", "topic", msg, "clientSecret")
// err := sdk.SendMessage(ctx, "domainID", "76cc9425-9df0-4b53-99b8-8dabbd3444fc/test", msg, "clientSecret")
// fmt.Println(err)
SendMessage(ctx context.Context, domainID, topic, msg, secret string) smqerrors.SDKError
+1 -1
View File
@@ -50,5 +50,5 @@ the message format is `myFormat`. It can be any valid subtopic name, JSON transf
Having a message format in the subtopic means that the subscriber has an option to subscribe to only one message format. This is a nice feature because message subscribers know what's the expected format of the message so that they can process it. If the message format is not important, wildcard subtopic can always be used to subscribe to any message format:
```
http://localhost:8185/m/<domain_id>/c/<channelID>/home/temperature/*
http://localhost:8185/m/<domain_id>/c/<channelID>/home/temperature/#
```
+1 -1
View File
@@ -61,7 +61,7 @@ func (ts *transformerService) Transform(msg *messaging.Message) (any, error) {
return nil, errors.Wrap(ErrTransform, errUnknownFormat)
}
subs := strings.Split(ret.Subtopic, ".")
subs := strings.Split(ret.Subtopic, "/")
if len(subs) == 0 {
return nil, errors.Wrap(ErrTransform, errUnknownFormat)
}
+6 -4
View File
@@ -100,7 +100,7 @@ The service is configured using the following environment variables (values show
- **Rule execution**: Runs Lua or Go scripts for incoming messages.
- **Multiple outputs**: Channels, alarms, email, SenML writers, remote PostgreSQL, and Slack outputs.
- **Scheduling**: Runs rules at specific times with recurring intervals.
- **Filtering and matching**: Input channel filtering and NATS-style topic matching (`*`, `>`).
- **Filtering and matching**: Input channel filtering and MQTT-style topic matching (`+`, `#`).
- **Observability**: `/metrics` Prometheus endpoint and Jaeger tracing support.
- **Payload limit**: Messages over 100 kB are rejected for processing.
@@ -110,7 +110,7 @@ The service is configured using the following environment variables (values show
1. The service subscribes to all internal broker messages.
2. For each message, it lists enabled rules for the same domain and input channel.
3. It matches the rule `input_topic` against the message subtopic using NATS-style wildcards.
3. It matches the rule `input_topic` against the message subtopic using MQTT-style wildcards.
4. The rule logic (Lua or Go) is executed and the result is passed to configured outputs.
### Message payloads
@@ -154,6 +154,8 @@ Supported output types (`outputs.OutputType`) and their fields:
| `save_remote_pg` | `host`, `port`, `user`, `password`, `database`, `table`, `mapping` | `mapping` is a Go template that must render a JSON object. |
| `slack` | `token`, `channel_id`, `message` | `message` is a Go template. |
For `channels` output, `topic` is a slash-delimited subtopic (for example, `alerts/high-temp`).
Templates receive a `Message` (the incoming message) and a `Result` (the script output) value.
## Data model
@@ -174,7 +176,7 @@ Defined in `re/postgres/init.go`:
| `updated_at` | `TIMESTAMP` | Last update timestamp |
| `updated_by` | `VARCHAR(254)` | Last updater user ID |
| `input_channel` | `VARCHAR(36)` | Input channel ID |
| `input_topic` | `TEXT` | Input topic (supports wildcards) |
| `input_topic` | `TEXT` | Input topic (supports `+` and `#` wildcards) |
| `outputs` | `JSONB` | Output definitions |
| `status` | `SMALLINT` | 0 = enabled, 1 = disabled, 2 = deleted |
| `logic_type` | `SMALLINT` | 0 = Lua, 1 = Go |
@@ -258,7 +260,7 @@ curl -X POST http://localhost:9008/<domainID>/rules \
-d '{
"name": "High Temperature Alert",
"input_channel": "sensors",
"input_topic": "temperature.*",
"input_topic": "temperature/+",
"logic": {
"type": 0,
"value": "if message.payload.t > 30 then return {measurement=\"temperature\", value=tostring(message.payload.t), unit=\"C\", threshold=\"30\", cause=\"temp high\", severity=90} end"
+8 -7
View File
@@ -48,7 +48,7 @@ func (re *re) Handle(msg *messaging.Message) error {
return err
}
for _, r := range page.Rules {
if matchSubject(msg.Subtopic, r.InputTopic) {
if matchTopic(msg.Subtopic, r.InputTopic) {
go func(ctx context.Context) {
re.runInfo <- re.process(ctx, r, msg)
}(ctx)
@@ -58,20 +58,21 @@ func (re *re) Handle(msg *messaging.Message) error {
return nil
}
// Match NATS subject to support wildcards.
func matchSubject(published, subscribed string) bool {
p := strings.Split(published, ".")
s := strings.Split(subscribed, ".")
// matchTopic matches a published subtopic against a subscription pattern
// using MQTT-style wildcards: + (single level) and # (multi-level).
func matchTopic(published, subscribed string) bool {
p := strings.Split(published, "/")
s := strings.Split(subscribed, "/")
n := len(p)
for i := range s {
if s[i] == ">" {
if s[i] == "#" {
return true
}
if i >= n {
return false
}
if s[i] != "*" && p[i] != s[i] {
if s[i] != "+" && p[i] != s[i] {
return false
}
}
+68 -4
View File
@@ -62,13 +62,77 @@ func Migration() (*migrate.MemoryMigrationSource, error) {
Id: "rules_03",
Up: []string{
`UPDATE rules
SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'ui') || jsonb_build_object('flow', metadata->'ui')
WHERE metadata ? 'ui' AND jsonb_typeof(metadata->'ui') = 'string'`,
SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'ui') || jsonb_build_object('flow', metadata->'ui')
WHERE metadata ? 'ui' AND jsonb_typeof(metadata->'ui') = 'string'`,
},
Down: []string{
`UPDATE rules
SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'flow') || jsonb_build_object('ui', metadata->'flow')
WHERE metadata ? 'flow' AND jsonb_typeof(metadata->'flow') = 'string'`,
SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'flow') || jsonb_build_object('ui', metadata->'flow')
WHERE metadata ? 'flow' AND jsonb_typeof(metadata->'flow') = 'string'`,
},
},
{
Id: "rules_04",
Up: []string{
// Canonicalize legacy rule topics from dot/NATS wildcards
// to slash/MQTT wildcards.
`UPDATE rules
SET input_topic = REPLACE(REPLACE(REPLACE(input_topic, '>', '#'), '*', '+'), '.', '/')
WHERE input_topic IS NOT NULL AND input_topic <> ''`,
},
Down: []string{
`UPDATE rules
SET input_topic = REPLACE(REPLACE(REPLACE(input_topic, '#', '>'), '+', '*'), '/', '.')
WHERE input_topic IS NOT NULL AND input_topic <> ''`,
},
},
{
Id: "rules_05",
Up: []string{
// Canonicalize channel output topics in rule outputs JSON:
// dot/NATS wildcards -> slash/MQTT wildcards.
`UPDATE rules AS r
SET outputs = COALESCE((
SELECT jsonb_agg(
CASE
WHEN out_elem.elem->>'type' = 'channels'
AND out_elem.elem ? 'topic'
AND jsonb_typeof(out_elem.elem->'topic') = 'string'
THEN jsonb_set(
out_elem.elem,
'{topic}',
to_jsonb(REPLACE(REPLACE(REPLACE(out_elem.elem->>'topic', '>', '#'), '*', '+'), '.', '/')),
false
)
ELSE out_elem.elem
END
ORDER BY out_elem.ord
)
FROM jsonb_array_elements(r.outputs) WITH ORDINALITY AS out_elem(elem, ord)
), '[]'::jsonb)
WHERE jsonb_typeof(r.outputs) = 'array'`,
},
Down: []string{
`UPDATE rules AS r
SET outputs = COALESCE((
SELECT jsonb_agg(
CASE
WHEN out_elem.elem->>'type' = 'channels'
AND out_elem.elem ? 'topic'
AND jsonb_typeof(out_elem.elem->'topic') = 'string'
THEN jsonb_set(
out_elem.elem,
'{topic}',
to_jsonb(REPLACE(REPLACE(REPLACE(out_elem.elem->>'topic', '#', '>'), '+', '*'), '/', '.')),
false
)
ELSE out_elem.elem
END
ORDER BY out_elem.ord
)
FROM jsonb_array_elements(r.outputs) WITH ORDINALITY AS out_elem(elem, ord)
), '[]'::jsonb)
WHERE jsonb_typeof(r.outputs) = 'array'`,
},
},
},
+1
View File
@@ -215,6 +215,7 @@ The Reports service supports the following operations:
List filters: `offset`, `limit`, `status`, `name`, `order` (`name`, `created_at`, `updated_at`), and `dir` (`asc`, `desc`).
Time ranges use relative expressions parsed by `pkg/reltime`, such as `now()` or `now()-24h` (units: `s`, `m`, `h`, `d`, `w`). Aggregation intervals use Go duration strings like `15m` or `1h`. File output formats are `pdf` and `csv`.
When metric `subtopic` is used, provide it in slash-delimited form (for example, `sensor/temp`).
### Example: Generate a report
+47
View File
@@ -54,6 +54,53 @@ func Migration() (*migrate.MemoryMigrationSource, error) {
`ALTER TABLE report_config DROP COLUMN report_template;`,
},
},
{
Id: "reports_03",
Up: []string{
// Canonicalize legacy report metric subtopics from dot/NATS wildcards
// to slash/MQTT wildcards.
`UPDATE report_config AS rc
SET metrics = COALESCE((
SELECT jsonb_agg(
CASE
WHEN metric.elem ? 'subtopic'
AND jsonb_typeof(metric.elem->'subtopic') = 'string'
THEN jsonb_set(
metric.elem,
'{subtopic}',
to_jsonb(REPLACE(REPLACE(REPLACE(metric.elem->>'subtopic', '>', '#'), '*', '+'), '.', '/')),
false
)
ELSE metric.elem
END
ORDER BY metric.ord
)
FROM jsonb_array_elements(rc.metrics) WITH ORDINALITY AS metric(elem, ord)
), '[]'::jsonb)
WHERE jsonb_typeof(rc.metrics) = 'array'`,
},
Down: []string{
`UPDATE report_config AS rc
SET metrics = COALESCE((
SELECT jsonb_agg(
CASE
WHEN metric.elem ? 'subtopic'
AND jsonb_typeof(metric.elem->'subtopic') = 'string'
THEN jsonb_set(
metric.elem,
'{subtopic}',
to_jsonb(REPLACE(REPLACE(REPLACE(metric.elem->>'subtopic', '#', '>'), '+', '*'), '/', '.')),
false
)
ELSE metric.elem
END
ORDER BY metric.ord
)
FROM jsonb_array_elements(rc.metrics) WITH ORDINALITY AS metric(elem, ord)
), '[]'::jsonb)
WHERE jsonb_typeof(rc.metrics) = 'array'`,
},
},
},
}