diff --git a/alarms/brokers/brokers_nats.go b/alarms/brokers/brokers_nats.go index 562bf55ec..b767f0703 100644 --- a/alarms/brokers/brokers_nats.go +++ b/alarms/brokers/brokers_nats.go @@ -17,7 +17,7 @@ import ( ) const ( - AllTopic = "alarms.>" + AllTopic = "alarms/#" prefix = "alarms" ) diff --git a/apidocs/openapi/notifiers.yaml b/apidocs/openapi/notifiers.yaml index f5bc2bb3f..c16c20008 100644 --- a/apidocs/openapi/notifiers.yaml +++ b/apidocs/openapi/notifiers.yaml @@ -155,7 +155,7 @@ components: description: An id of the owner who created subscription. topic: type: string - example: topic.subtopic + example: topic/subtopic description: Topic to which the user subscribes. contact: type: string @@ -166,7 +166,7 @@ components: properties: topic: type: string - example: topic.subtopic + example: topic/subtopic description: Topic to which the user subscribes. contact: type: string diff --git a/cli/README.md b/cli/README.md index 4f9d71b80..a75c7d5e1 100644 --- a/cli/README.md +++ b/cli/README.md @@ -269,7 +269,7 @@ supermq-cli channels connections #### Send a message over HTTP ```bash -supermq-cli messages send '[{"bn":"Dev1","n":"temp","v":20}, {"n":"hum","v":40}, {"bn":"Dev2", "n":"temp","v":20}, {"n":"hum","v":40}]' +supermq-cli messages send '[{"bn":"Dev1","n":"temp","v":20}, {"n":"hum","v":40}, {"bn":"Dev2", "n":"temp","v":20}, {"n":"hum","v":40}]' ``` ### Groups diff --git a/cli/message.go b/cli/message.go index 356a72dc2..91b252453 100644 --- a/cli/message.go +++ b/cli/message.go @@ -7,7 +7,7 @@ import "github.com/spf13/cobra" var cmdMessages = []cobra.Command{ { - Use: "send ", + Use: "send ", Short: "Send messages", Long: `Sends message on the channel`, Run: func(cmd *cobra.Command, args []string) { diff --git a/cli/message_test.go b/cli/message_test.go index 7f90f0c64..dcb0b37dd 100644 --- a/cli/message_test.go +++ b/cli/message_test.go @@ -35,9 +35,9 @@ func TestSendMesageCmd(t *testing.T) { { desc: "send message successfully", args: []string{ + domainID, channel.ID, message, - domainID, client.Credentials.Secret, }, logType: okLog, @@ -45,10 +45,10 @@ func TestSendMesageCmd(t *testing.T) { { desc: "send message with invalid args", args: []string{ + domainID, channel.ID, message, client.Credentials.Secret, - domainID, extraArg, }, logType: usageLog, @@ -56,9 +56,9 @@ func TestSendMesageCmd(t *testing.T) { { desc: "send message with invalid client secret", args: []string{ + domainID, channel.ID, message, - domainID, "invalid_secret", }, sdkErr: errors.NewSDKErrorWithStatus(errors.Wrap(svcerr.ErrAuthentication, errors.Wrap(svcerr.ErrAuthorization, svcerr.ErrNotFound)), http.StatusBadRequest), diff --git a/clients/service.go b/clients/service.go index ea8b7da95..3c2b7664e 100644 --- a/clients/service.go +++ b/clients/service.go @@ -208,6 +208,9 @@ func (svc service) UpdateSecret(ctx context.Context, session authn.Session, id, if err != nil { return Client{}, errors.Wrap(svcerr.ErrUpdateEntity, err) } + if err := svc.cache.Remove(ctx, client.ID); err != nil { + return client, errors.Wrap(svcerr.ErrRemoveEntity, err) + } return client, nil } diff --git a/clients/service_test.go b/clients/service_test.go index f0c92904f..88bf85b70 100644 --- a/clients/service_test.go +++ b/clients/service_test.go @@ -707,6 +707,7 @@ func TestUpdateSecret(t *testing.T) { updateSecretResponse clients.Client session smqauthn.Session updateErr error + removeErr error err error }{ { @@ -732,15 +733,37 @@ func TestUpdateSecret(t *testing.T) { updateErr: repoerr.ErrMalformedEntity, err: svcerr.ErrUpdateEntity, }, + { + desc: "update client secret with failed to remove cache", + client: client, + newSecret: "newSecret", + session: smqauthn.Session{UserID: validID}, + updateSecretResponse: clients.Client{ + ID: client.ID, + Credentials: clients.Credentials{ + Identity: client.Credentials.Identity, + Secret: "newSecret", + }, + }, + removeErr: repoerr.ErrRemoveEntity, + err: svcerr.ErrRemoveEntity, + }, } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { repoCall := repo.On("UpdateSecret", context.Background(), mock.Anything).Return(tc.updateSecretResponse, tc.updateErr) + var cacheCall *mock.Call + if tc.updateErr == nil { + cacheCall = cache.On("Remove", context.Background(), tc.updateSecretResponse.ID).Return(tc.removeErr) + } updatedClient, err := svc.UpdateSecret(context.Background(), tc.session, tc.client.ID, tc.newSecret) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) assert.Equal(t, tc.updateSecretResponse, updatedClient, fmt.Sprintf("%s: expected %v got %v\n", tc.desc, tc.updateSecretResponse, updatedClient)) repoCall.Unset() + if cacheCall != nil { + cacheCall.Unset() + } }) } } diff --git a/cmd/clients/main.go b/cmd/clients/main.go index d9d8e7c46..0811b2f88 100644 --- a/cmd/clients/main.go +++ b/cmd/clients/main.go @@ -92,7 +92,7 @@ type config struct { StandaloneID string `env:"MG_CLIENTS_STANDALONE_ID" envDefault:""` StandaloneToken string `env:"MG_CLIENTS_STANDALONE_TOKEN" envDefault:""` CacheURL string `env:"MG_CLIENTS_CACHE_URL" envDefault:"redis://localhost:6379/0"` - CacheKeyDuration time.Duration `env:"MG_CLIENTS_CACHE_KEY_DURATION" envDefault:"10m"` + CacheKeyDuration time.Duration `env:"MG_CLIENTS_CACHE_KEY_DURATION" envDefault:"1h"` JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"` ESURL string `env:"MG_ES_URL" envDefault:"amqp://guest:guest@localhost:5682/"` diff --git a/consumers/README.md b/consumers/README.md index 7b175df46..97ac1570c 100644 --- a/consumers/README.md +++ b/consumers/README.md @@ -65,7 +65,7 @@ curl -X POST http://localhost:9014/subscriptions \ -H "Authorization: Bearer " \ -H "Content-Type: application/json" \ -d '{ - "topic": "some.topic.subtopic", + "topic": "some/topic/subtopic", "contact": "user@example.com" }' ``` diff --git a/consumers/notifiers/README.md b/consumers/notifiers/README.md index ff16de480..0320e355b 100644 --- a/consumers/notifiers/README.md +++ b/consumers/notifiers/README.md @@ -10,15 +10,15 @@ The service is configured using environment variables. Values shown are from [do Used by `consumers/notifiers/smtp` via `internal/email`. -| Variable | Description | Default | -| --- | --- | --- | -| `MG_EMAIL_HOST` | SMTP host | `smtp.mailtrap.io` | -| `MG_EMAIL_PORT` | SMTP port | `2525` | -| `MG_EMAIL_USERNAME` | SMTP username | `18bf7f70705139` | -| `MG_EMAIL_PASSWORD` | SMTP password | `2b0d302e775b1e` | +| Variable | Description | Default | +| ----------------------- | ---------------------------------------------- | ------------------ | +| `MG_EMAIL_HOST` | SMTP host | `smtp.mailtrap.io` | +| `MG_EMAIL_PORT` | SMTP port | `2525` | +| `MG_EMAIL_USERNAME` | SMTP username | `18bf7f70705139` | +| `MG_EMAIL_PASSWORD` | SMTP password | `2b0d302e775b1e` | | `MG_EMAIL_FROM_ADDRESS` | Default from address (used if `from` is empty) | `from@example.com` | -| `MG_EMAIL_FROM_NAME` | Default from name | `Example` | -| `MG_EMAIL_TEMPLATE` | Email template path | `email.tmpl` | +| `MG_EMAIL_FROM_NAME` | Default from name | `Example` | +| `MG_EMAIL_TEMPLATE` | Email template path | `email.tmpl` | ### SMPP notifier (SMS) @@ -26,16 +26,16 @@ Used by `consumers/notifiers/smtp` via `internal/email`. Defined in `consumers/notifiers/smpp/config.go`. -| Variable | Description | Default | -| --- | --- | --- | -| `MG_SMPP_ADDRESS` | SMPP address in `host:port` format | "" | -| `MG_SMPP_USERNAME` | SMPP username | "" | -| `MG_SMPP_PASSWORD` | SMPP password | "" | -| `MG_SMPP_SYSTEM_TYPE` | SMPP system type | "" | -| `MG_SMPP_SRC_ADDR_TON` | SMPP source address TON | `0` | -| `MG_SMPP_DST_ADDR_TON` | SMPP source address NPI | `0` | -| `MG_SMPP_SRC_ADDR_NPI` | SMPP destination address TON | `0` | -| `MG_SMPP_DST_ADDR_NPI` | SMPP destination address NPI | `0` | +| Variable | Description | Default | +| ---------------------- | ---------------------------------- | ------- | +| `MG_SMPP_ADDRESS` | SMPP address in `host:port` format | "" | +| `MG_SMPP_USERNAME` | SMPP username | "" | +| `MG_SMPP_PASSWORD` | SMPP password | "" | +| `MG_SMPP_SYSTEM_TYPE` | SMPP system type | "" | +| `MG_SMPP_SRC_ADDR_TON` | SMPP source address TON | `0` | +| `MG_SMPP_DST_ADDR_TON` | SMPP source address NPI | `0` | +| `MG_SMPP_SRC_ADDR_NPI` | SMPP destination address TON | `0` | +| `MG_SMPP_DST_ADDR_NPI` | SMPP destination address NPI | `0` | Note: The SMPP env tags are mapped exactly as defined in `consumers/notifiers/smpp/config.go`. @@ -43,32 +43,32 @@ Note: The SMPP env tags are mapped exactly as defined in `consumers/notifiers/sm Defined in `consumers/notifiers/smpp/README.md`. -| Variable | Description | Default | -| --- | --- | --- | -| `MG_SMPP_NOTIFIER_LOG_LEVEL` | Log level for SMPP notifier | `info` | -| `MG_SMPP_NOTIFIER_FROM_ADDRESS` | From address for SMS notifications | "" | -| `MG_SMPP_NOTIFIER_CONFIG_PATH` | Config file path for message broker subjects and payload type | `/config.toml` | -| `MG_SMPP_NOTIFIER_HTTP_HOST` | Service HTTP host | `localhost` | -| `MG_SMPP_NOTIFIER_HTTP_PORT` | Service HTTP port | `9014` | -| `MG_SMPP_NOTIFIER_HTTP_SERVER_CERT` | Service HTTP server certificate path | "" | -| `MG_SMPP_NOTIFIER_HTTP_SERVER_KEY` | Service HTTP server key path | "" | -| `MG_SMPP_NOTIFIER_DB_HOST` | Database host address | `localhost` | -| `MG_SMPP_NOTIFIER_DB_PORT` | Database host port | `5432` | -| `MG_SMPP_NOTIFIER_DB_USER` | Database user | `magistrala` | -| `MG_SMPP_NOTIFIER_DB_PASS` | Database password | `magistrala` | -| `MG_SMPP_NOTIFIER_DB_NAME` | Database name | `subscriptions` | -| `MG_SMPP_NOTIFIER_DB_SSL_MODE` | DB SSL mode (disable, require, verify-ca, verify-full) | `disable` | -| `MG_SMPP_NOTIFIER_DB_SSL_CERT` | DB SSL client cert path | "" | -| `MG_SMPP_NOTIFIER_DB_SSL_KEY` | DB SSL client key path | "" | -| `MG_SMPP_NOTIFIER_DB_SSL_ROOT_CERT` | DB SSL root cert path | "" | -| `MG_AUTH_GRPC_URL` | Auth gRPC URL | `localhost:7001` | -| `MG_AUTH_GRPC_TIMEOUT` | Auth gRPC timeout | `1s` | -| `MG_AUTH_GRPC_CLIENT_TLS` | Auth client TLS flag | `false` | -| `MG_AUTH_GRPC_CA_CERT` | Auth client CA certs path | "" | -| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://127.0.0.1:4222` | -| `MG_JAEGER_URL` | Jaeger tracing URL | `http://jaeger:14268/api/traces` | -| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` | -| `MG_SMPP_NOTIFIER_INSTANCE_ID` | SMPP notifier instance ID | "" | +| Variable | Description | Default | +| ----------------------------------- | ------------------------------------------------------------- | -------------------------------- | +| `MG_SMPP_NOTIFIER_LOG_LEVEL` | Log level for SMPP notifier | `info` | +| `MG_SMPP_NOTIFIER_FROM_ADDRESS` | From address for SMS notifications | "" | +| `MG_SMPP_NOTIFIER_CONFIG_PATH` | Config file path for message broker subjects and payload type | `/config.toml` | +| `MG_SMPP_NOTIFIER_HTTP_HOST` | Service HTTP host | `localhost` | +| `MG_SMPP_NOTIFIER_HTTP_PORT` | Service HTTP port | `9014` | +| `MG_SMPP_NOTIFIER_HTTP_SERVER_CERT` | Service HTTP server certificate path | "" | +| `MG_SMPP_NOTIFIER_HTTP_SERVER_KEY` | Service HTTP server key path | "" | +| `MG_SMPP_NOTIFIER_DB_HOST` | Database host address | `localhost` | +| `MG_SMPP_NOTIFIER_DB_PORT` | Database host port | `5432` | +| `MG_SMPP_NOTIFIER_DB_USER` | Database user | `magistrala` | +| `MG_SMPP_NOTIFIER_DB_PASS` | Database password | `magistrala` | +| `MG_SMPP_NOTIFIER_DB_NAME` | Database name | `subscriptions` | +| `MG_SMPP_NOTIFIER_DB_SSL_MODE` | DB SSL mode (disable, require, verify-ca, verify-full) | `disable` | +| `MG_SMPP_NOTIFIER_DB_SSL_CERT` | DB SSL client cert path | "" | +| `MG_SMPP_NOTIFIER_DB_SSL_KEY` | DB SSL client key path | "" | +| `MG_SMPP_NOTIFIER_DB_SSL_ROOT_CERT` | DB SSL root cert path | "" | +| `MG_AUTH_GRPC_URL` | Auth gRPC URL | `localhost:7001` | +| `MG_AUTH_GRPC_TIMEOUT` | Auth gRPC timeout | `1s` | +| `MG_AUTH_GRPC_CLIENT_TLS` | Auth client TLS flag | `false` | +| `MG_AUTH_GRPC_CA_CERT` | Auth client CA certs path | "" | +| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://127.0.0.1:4222` | +| `MG_JAEGER_URL` | Jaeger tracing URL | `http://jaeger:14268/api/traces` | +| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` | +| `MG_SMPP_NOTIFIER_INSTANCE_ID` | SMPP notifier instance ID | "" | ## Features @@ -84,7 +84,7 @@ Defined in `consumers/notifiers/smpp/README.md`. 1. Clients register subscriptions through the HTTP API (`topic` + `contact`). 2. The service authenticates the token, assigns an owner ID, and persists the subscription. -3. When a message arrives, the service builds the topic as `channel` or `channel.subtopic`, retrieves matching subscriptions, and gathers contacts. +3. When a message arrives, the service builds the topic as `channel` or `channel/subtopic`, retrieves matching subscriptions, and gathers contacts. 4. The notifier implementation sends notifications using the configured backend. ### Components @@ -99,12 +99,12 @@ Defined in `consumers/notifiers/smpp/README.md`. Defined in `consumers/notifiers/postgres/init.go`: -| Column | Type | Description | -| --- | --- | --- | -| `id` | `VARCHAR(254)` | Subscription identifier (primary key) | -| `owner_id` | `VARCHAR(254)` | Owner ID derived from the auth token | -| `contact` | `VARCHAR(254)` | Notification contact (email or phone) | -| `topic` | `TEXT` | Topic to match (`channel` or `channel.subtopic`) | +| Column | Type | Description | +| ---------- | -------------- | ------------------------------------------------ | +| `id` | `VARCHAR(254)` | Subscription identifier (primary key) | +| `owner_id` | `VARCHAR(254)` | Owner ID derived from the auth token | +| `contact` | `VARCHAR(254)` | Notification contact (email or phone) | +| `topic` | `TEXT` | Topic to match (`channel` or `channel/subtopic`) | Constraint: `UNIQUE(topic, contact)` @@ -129,13 +129,13 @@ go test ./consumers/notifiers/... The Notifiers service supports the following operations (see `apidocs/openapi/notifiers.yaml`): -| Operation | Method & Path | Description | -| --- | --- | --- | -| `createSubscription` | `POST /subscriptions` | Create a new subscription | -| `listSubscriptions` | `GET /subscriptions` | List subscriptions with filters | -| `viewSubscription` | `GET /subscriptions/{id}` | Retrieve a subscription | -| `removeSubscription` | `DELETE /subscriptions/{id}` | Delete a subscription | -| `health` | `GET /health` | Service health check | +| Operation | Method & Path | Description | +| -------------------- | ---------------------------- | ------------------------------- | +| `createSubscription` | `POST /subscriptions` | Create a new subscription | +| `listSubscriptions` | `GET /subscriptions` | List subscriptions with filters | +| `viewSubscription` | `GET /subscriptions/{id}` | Retrieve a subscription | +| `removeSubscription` | `DELETE /subscriptions/{id}` | Delete a subscription | +| `health` | `GET /health` | Service health check | ### Example: Create a subscription @@ -144,7 +144,7 @@ curl -X POST http://localhost:9014/subscriptions \ -H "Authorization: Bearer " \ -H "Content-Type: application/json" \ -d '{ - "topic": "channel.subtopic", + "topic": "channel/subtopic", "contact": "user@example.com" }' ``` @@ -152,7 +152,7 @@ curl -X POST http://localhost:9014/subscriptions \ ### Example: List subscriptions ```bash -curl -X GET "http://localhost:9014/subscriptions?topic=channel.subtopic&contact=user@example.com&limit=20&offset=0" \ +curl -X GET "http://localhost:9014/subscriptions?topic=channel/subtopic&contact=user@example.com&limit=20&offset=0" \ -H "Authorization: Bearer " ``` diff --git a/consumers/notifiers/postgres/init.go b/consumers/notifiers/postgres/init.go index ac74c3c0b..720fb24ae 100644 --- a/consumers/notifiers/postgres/init.go +++ b/consumers/notifiers/postgres/init.go @@ -23,6 +23,20 @@ func Migration() *migrate.MemoryMigrationSource { "DROP TABLE IF EXISTS subscriptions", }, }, + { + Id: "subscriptions_2", + Up: []string{ + // Canonicalize legacy dot-delimited topics to slash-delimited topics. + `UPDATE subscriptions + SET topic = REPLACE(topic, '.', '/') + WHERE topic LIKE '%.%'`, + }, + Down: []string{ + `UPDATE subscriptions + SET topic = REPLACE(topic, '/', '.') + WHERE topic LIKE '%/%'`, + }, + }, }, } } diff --git a/consumers/notifiers/service.go b/consumers/notifiers/service.go index 6b9b1c8bf..d613fb06d 100644 --- a/consumers/notifiers/service.go +++ b/consumers/notifiers/service.go @@ -5,12 +5,12 @@ package notifiers import ( "context" - "fmt" "github.com/absmach/supermq" "github.com/absmach/supermq/consumers" smqauthn "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/errors" + repoerr "github.com/absmach/supermq/pkg/errors/repository" svcerr "github.com/absmach/supermq/pkg/errors/service" "github.com/absmach/supermq/pkg/messaging" ) @@ -112,24 +112,11 @@ func (ns *notifierService) ConsumeBlocking(ctx context.Context, message any) err if !ok { return ErrMessage } - topic := msg.GetChannel() - if msg.GetSubtopic() != "" { - topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic()) - } - pm := PageMetadata{ - Topic: topic, - Offset: 0, - Limit: -1, - } - page, err := ns.subs.RetrieveAll(ctx, pm) + to, err := ns.recipients(ctx, msg) if err != nil { return err } - var to []string - for _, sub := range page.Subscriptions { - to = append(to, sub.Contact) - } if len(to) > 0 { err := ns.notifier.Notify(ns.from, to, msg) if err != nil { @@ -146,25 +133,12 @@ func (ns *notifierService) ConsumeAsync(ctx context.Context, message any) { ns.errCh <- ErrMessage return } - topic := msg.GetChannel() - if msg.GetSubtopic() != "" { - topic = fmt.Sprintf("%s.%s", msg.GetChannel(), msg.GetSubtopic()) - } - pm := PageMetadata{ - Topic: topic, - Offset: 0, - Limit: -1, - } - page, err := ns.subs.RetrieveAll(ctx, pm) + to, err := ns.recipients(ctx, msg) if err != nil { ns.errCh <- err return } - var to []string - for _, sub := range page.Subscriptions { - to = append(to, sub.Contact) - } if len(to) > 0 { if err := ns.notifier.Notify(ns.from, to, msg); err != nil { ns.errCh <- errors.Wrap(consumers.ErrNotify, err) @@ -175,3 +149,44 @@ func (ns *notifierService) ConsumeAsync(ctx context.Context, message any) { func (ns *notifierService) Errors() <-chan error { return ns.errCh } + +func (ns *notifierService) recipients(ctx context.Context, msg *messaging.Message) ([]string, error) { + topic, ok := subscriptionTopic(msg) + if !ok { + return nil, nil + } + + pm := PageMetadata{ + Topic: topic, + Offset: 0, + Limit: -1, + } + page, err := ns.subs.RetrieveAll(ctx, pm) + if err != nil { + if errors.Contains(err, repoerr.ErrNotFound) { + return nil, nil + } + return nil, err + } + + to := make([]string, 0, len(page.Subscriptions)) + for _, sub := range page.Subscriptions { + to = append(to, sub.Contact) + } + + return to, nil +} + +func subscriptionTopic(msg *messaging.Message) (string, bool) { + channel := msg.GetChannel() + if channel == "" { + return "", false + } + + subtopic := msg.GetSubtopic() + if subtopic == "" { + return channel, true + } + + return channel + "/" + subtopic, true +} diff --git a/consumers/notifiers/service_test.go b/consumers/notifiers/service_test.go index 58bd57715..f5a498c89 100644 --- a/consumers/notifiers/service_test.go +++ b/consumers/notifiers/service_test.go @@ -31,12 +31,17 @@ const ( ) func newService() (notifiers.Service, *authnmocks.Authentication, *mocks.SubscriptionsRepository) { + svc, auth, repo, _ := newServiceWithNotifier() + return svc, auth, repo +} + +func newServiceWithNotifier() (notifiers.Service, *authnmocks.Authentication, *mocks.SubscriptionsRepository, *smqmocks.Notifier) { repo := new(mocks.SubscriptionsRepository) auth := new(authnmocks.Authentication) notifier := new(smqmocks.Notifier) idp := uuid.NewMock() from := "exampleFrom" - return notifiers.New(auth, repo, idp, notifier, from), auth, repo + return notifiers.New(auth, repo, idp, notifier, from), auth, repo, notifier } func TestCreateSubscription(t *testing.T) { @@ -324,7 +329,7 @@ func TestRemoveSubscription(t *testing.T) { } func TestConsume(t *testing.T) { - svc, _, repo := newService() + svc, _, repo, notifier := newServiceWithNotifier() msg := messaging.Message{ Channel: "topic", Subtopic: "subtopic", @@ -357,4 +362,24 @@ func TestConsume(t *testing.T) { assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) repoCall.Unset() } + + canonicalMsg := &messaging.Message{ + Channel: "topic", + Subtopic: "sub/topic", + } + + repo.On("RetrieveAll", context.TODO(), notifiers.PageMetadata{ + Topic: "topic/sub/topic", + Offset: 0, + Limit: -1, + }).Return(notifiers.Page{ + Subscriptions: []notifiers.Subscription{ + {Contact: "user@example.com"}, + }, + }, nil).Once() + + notifier.On("Notify", "exampleFrom", []string{"user@example.com"}, canonicalMsg).Return(nil).Once() + + err := svc.ConsumeBlocking(context.TODO(), canonicalMsg) + assert.NoError(t, err) } diff --git a/consumers/writers/README.md b/consumers/writers/README.md index d0945b762..d5786e2cb 100644 --- a/consumers/writers/README.md +++ b/consumers/writers/README.md @@ -15,66 +15,66 @@ Values shown are from [docker/.env](https://github.com/absmach/magistrala/blob/m #### Postgres Service endpoints -| Variable | Description | Default | -| --- | --- | --- | -| `MG_POSTGRES_WRITER_LOG_LEVEL` | Service log level | `debug` | -| `MG_POSTGRES_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` | -| `MG_POSTGRES_WRITER_HTTP_HOST` | HTTP host | `postgres-writer` | -| `MG_POSTGRES_WRITER_HTTP_PORT` | HTTP port | `9007` | -| `MG_POSTGRES_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" | -| `MG_POSTGRES_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" | -| `MG_POSTGRES_WRITER_INSTANCE_ID` | Instance ID | "" | +| Variable | Description | Default | +| ------------------------------------- | ------------------------------------- | ----------------- | +| `MG_POSTGRES_WRITER_LOG_LEVEL` | Service log level | `debug` | +| `MG_POSTGRES_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` | +| `MG_POSTGRES_WRITER_HTTP_HOST` | HTTP host | `postgres-writer` | +| `MG_POSTGRES_WRITER_HTTP_PORT` | HTTP port | `9007` | +| `MG_POSTGRES_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" | +| `MG_POSTGRES_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" | +| `MG_POSTGRES_WRITER_INSTANCE_ID` | Instance ID | "" | #### Postgres Database -| Variable | Description | Default | -| --- | --- | --- | -| `MG_POSTGRES_HOST` | PostgreSQL host | `postgres` | -| `MG_POSTGRES_PORT` | PostgreSQL port | `5432` | -| `MG_POSTGRES_USER` | PostgreSQL user | `supermq` | -| `MG_POSTGRES_PASS` | PostgreSQL password | `supermq` | -| `MG_POSTGRES_NAME` | PostgreSQL database name | `messages` | -| `MG_POSTGRES_SSL_MODE` | PostgreSQL SSL mode | `disable` | -| `MG_POSTGRES_SSL_CERT` | PostgreSQL SSL client cert | "" | -| `MG_POSTGRES_SSL_KEY` | PostgreSQL SSL client key | "" | -| `MG_POSTGRES_SSL_ROOT_CERT` | PostgreSQL SSL root cert | "" | +| Variable | Description | Default | +| --------------------------- | -------------------------- | ---------- | +| `MG_POSTGRES_HOST` | PostgreSQL host | `postgres` | +| `MG_POSTGRES_PORT` | PostgreSQL port | `5432` | +| `MG_POSTGRES_USER` | PostgreSQL user | `supermq` | +| `MG_POSTGRES_PASS` | PostgreSQL password | `supermq` | +| `MG_POSTGRES_NAME` | PostgreSQL database name | `messages` | +| `MG_POSTGRES_SSL_MODE` | PostgreSQL SSL mode | `disable` | +| `MG_POSTGRES_SSL_CERT` | PostgreSQL SSL client cert | "" | +| `MG_POSTGRES_SSL_KEY` | PostgreSQL SSL client key | "" | +| `MG_POSTGRES_SSL_ROOT_CERT` | PostgreSQL SSL root cert | "" | #### Postgres Message broker and observability -| Variable | Description | Default | -| --- | --- | --- | -| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://nats:4222` | -| `MG_JAEGER_URL` | Jaeger collector endpoint | `http://jaeger:4318/v1/traces` | -| `MG_JAEGER_TRACE_RATIO` | Trace sampling ratio | `1.0` | -| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` | +| Variable | Description | Default | +| ----------------------- | --------------------------------------------- | ------------------------------ | +| `MG_MESSAGE_BROKER_URL` | Message broker URL | `nats://nats:4222` | +| `MG_JAEGER_URL` | Jaeger collector endpoint | `http://jaeger:4318/v1/traces` | +| `MG_JAEGER_TRACE_RATIO` | Trace sampling ratio | `1.0` | +| `MG_SEND_TELEMETRY` | Send telemetry to Magistrala call-home server | `true` | ### Timescale writer #### Timescale Service endpoints -| Variable | Description | Default | -| --- | --- | --- | -| `MG_TIMESCALE_WRITER_LOG_LEVEL` | Service log level | `debug` | -| `MG_TIMESCALE_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` | -| `MG_TIMESCALE_WRITER_HTTP_HOST` | HTTP host | `timescale-writer` | -| `MG_TIMESCALE_WRITER_HTTP_PORT` | HTTP port | `9012` | -| `MG_TIMESCALE_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" | -| `MG_TIMESCALE_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" | -| `MG_TIMESCALE_WRITER_INSTANCE_ID` | Instance ID | "" | +| Variable | Description | Default | +| -------------------------------------- | ------------------------------------- | ------------------ | +| `MG_TIMESCALE_WRITER_LOG_LEVEL` | Service log level | `debug` | +| `MG_TIMESCALE_WRITER_CONFIG_PATH` | Config file path (topics/transformer) | `/config.toml` | +| `MG_TIMESCALE_WRITER_HTTP_HOST` | HTTP host | `timescale-writer` | +| `MG_TIMESCALE_WRITER_HTTP_PORT` | HTTP port | `9012` | +| `MG_TIMESCALE_WRITER_HTTP_SERVER_CERT` | HTTPS server certificate path | "" | +| `MG_TIMESCALE_WRITER_HTTP_SERVER_KEY` | HTTPS server key path | "" | +| `MG_TIMESCALE_WRITER_INSTANCE_ID` | Instance ID | "" | #### Timescale Database -| Variable | Description | Default | -| --- | --- | --- | -| `MG_TIMESCALE_HOST` | TimescaleDB host | `timescale` | -| `MG_TIMESCALE_PORT` | TimescaleDB port | `5432` | -| `MG_TIMESCALE_USER` | TimescaleDB user | `supermq` | -| `MG_TIMESCALE_PASS` | TimescaleDB password | `supermq` | -| `MG_TIMESCALE_NAME` | TimescaleDB database name | `supermq` | -| `MG_TIMESCALE_SSL_MODE` | TimescaleDB SSL mode | `disable` | -| `MG_TIMESCALE_SSL_CERT` | TimescaleDB SSL client cert | "" | -| `MG_TIMESCALE_SSL_KEY` | TimescaleDB SSL client key | "" | -| `MG_TIMESCALE_SSL_ROOT_CERT` | TimescaleDB SSL root cert | "" | +| Variable | Description | Default | +| ---------------------------- | --------------------------- | ----------- | +| `MG_TIMESCALE_HOST` | TimescaleDB host | `timescale` | +| `MG_TIMESCALE_PORT` | TimescaleDB port | `5432` | +| `MG_TIMESCALE_USER` | TimescaleDB user | `supermq` | +| `MG_TIMESCALE_PASS` | TimescaleDB password | `supermq` | +| `MG_TIMESCALE_NAME` | TimescaleDB database name | `supermq` | +| `MG_TIMESCALE_SSL_MODE` | TimescaleDB SSL mode | `disable` | +| `MG_TIMESCALE_SSL_CERT` | TimescaleDB SSL client cert | "" | +| `MG_TIMESCALE_SSL_KEY` | TimescaleDB SSL client key | "" | +| `MG_TIMESCALE_SSL_ROOT_CERT` | TimescaleDB SSL root cert | "" | #### Timescale Message broker and observability @@ -91,7 +91,7 @@ The config file controls subscription topics and optional transformer settings f ```toml ["subscriber"] -topics = ["writers.>"] +topics = ["writers/#"] [transformer] format = "senml" @@ -104,24 +104,24 @@ time_fields = [ ] ``` -The topic filter uses `writers.*` syntax in the config file for both backends. Writers do not expose broker mode, delivery policy, or consumer-group settings in this file. They always consume through the stream-backed broker adapter in `consumers/writers/brokers`: +The topic filter uses slash-delimited MQTT-style syntax (`+`, `#`) in the config file for both backends. Writers do not expose broker mode, delivery policy, or consumer-group settings in this file. They always consume through the stream-backed broker adapter in `consumers/writers/brokers`: - NATS builds use JetStream streams with durable consumers. -- FluxMQ builds publish to and consume from the `writers` stream queue while preserving the same `writers.>` config syntax. +- FluxMQ builds publish to and consume from the `writers` stream queue while preserving the same `writers/#` config syntax. ## Features - **Message persistence**: Stores incoming SenML messages into PostgreSQL or TimescaleDB. - **JSON payload support**: Saves JSON payloads into dynamically created tables. - **Stream-backed ingestion**: Consumes through NATS JetStream durable consumers or FluxMQ stream queues. -- **Configurable subscription**: Limits ingestion to specific `writers.*` topics. +- **Configurable subscription**: Limits ingestion to specific `writers//` topics. - **Observability**: Exposes `/health` and `/metrics` endpoints, with Jaeger tracing. ## Architecture ### Runtime flow -1. The rules engine publishes writer messages under `writers.*`. +1. The rules engine publishes writer messages under `writers//`. 2. The writer loads `config.toml` to select topic filters and transformer settings. 3. The broker adapter consumes from the underlying stream-backed implementation. 4. The consumer converts messages to SenML or JSON payloads. @@ -138,22 +138,22 @@ The topic filter uses `writers.*` syntax in the config file for both backends. W Defined in `consumers/writers/postgres/init.go`: -| Column | Type | Description | -| --- | --- | --- | -| `id` | `UUID` | Message ID | -| `channel` | `UUID` | Channel ID | -| `subtopic` | `VARCHAR(254)` | Subtopic | -| `publisher` | `UUID` | Publisher ID | -| `protocol` | `TEXT` | Protocol name | -| `name` | `TEXT` | SenML name | -| `unit` | `TEXT` | SenML unit | -| `value` | `FLOAT` | Numeric value | -| `string_value` | `TEXT` | String value | -| `bool_value` | `BOOL` | Boolean value | -| `data_value` | `BYTEA` | Data value | -| `sum` | `FLOAT` | Sum value | -| `time` | `FLOAT` | Measurement time | -| `update_time` | `FLOAT` | Update time | +| Column | Type | Description | +| -------------- | -------------- | ---------------- | +| `id` | `UUID` | Message ID | +| `channel` | `UUID` | Channel ID | +| `subtopic` | `VARCHAR(254)` | Subtopic | +| `publisher` | `UUID` | Publisher ID | +| `protocol` | `TEXT` | Protocol name | +| `name` | `TEXT` | SenML name | +| `unit` | `TEXT` | SenML unit | +| `value` | `FLOAT` | Numeric value | +| `string_value` | `TEXT` | String value | +| `bool_value` | `BOOL` | Boolean value | +| `data_value` | `BYTEA` | Data value | +| `sum` | `FLOAT` | Sum value | +| `time` | `FLOAT` | Measurement time | +| `update_time` | `FLOAT` | Update time | Primary key: `(time, publisher, subtopic, name)` @@ -161,21 +161,21 @@ Primary key: `(time, publisher, subtopic, name)` Defined in `consumers/writers/timescale/init.go`: -| Column | Type | Description | -| --- | --- | --- | -| `time` | `BIGINT` | Measurement time | -| `channel` | `UUID` | Channel ID | -| `subtopic` | `VARCHAR(254)` | Subtopic | -| `publisher` | `VARCHAR(254)` | Publisher ID | -| `protocol` | `TEXT` | Protocol name | -| `name` | `VARCHAR(254)` | SenML name | -| `unit` | `TEXT` | SenML unit | -| `value` | `FLOAT` | Numeric value | -| `string_value` | `TEXT` | String value | -| `bool_value` | `BOOL` | Boolean value | -| `data_value` | `BYTEA` | Data value | -| `sum` | `FLOAT` | Sum value | -| `update_time` | `FLOAT` | Update time | +| Column | Type | Description | +| -------------- | -------------- | ---------------- | +| `time` | `BIGINT` | Measurement time | +| `channel` | `UUID` | Channel ID | +| `subtopic` | `VARCHAR(254)` | Subtopic | +| `publisher` | `VARCHAR(254)` | Publisher ID | +| `protocol` | `TEXT` | Protocol name | +| `name` | `VARCHAR(254)` | SenML name | +| `unit` | `TEXT` | SenML unit | +| `value` | `FLOAT` | Numeric value | +| `string_value` | `TEXT` | String value | +| `bool_value` | `BOOL` | Boolean value | +| `data_value` | `BYTEA` | Data value | +| `sum` | `FLOAT` | Sum value | +| `update_time` | `FLOAT` | Update time | Primary key: `(time, channel, subtopic, protocol, publisher, name)` @@ -262,10 +262,10 @@ go test ./consumers/writers/... Writers do not expose a message ingestion API. Messages are written via the message broker, and writers consume them through the stream-backed broker adapter. The HTTP API provides only health and metrics endpoints. -| Endpoint | Description | -| --- | --- | -| `GET /health` | Service health check | -| `GET /metrics` | Prometheus metrics | +| Endpoint | Description | +| -------------- | -------------------- | +| `GET /health` | Service health check | +| `GET /metrics` | Prometheus metrics | For an in-depth explanation of Writers, see the [official documentation][doc]. diff --git a/consumers/writers/brokers/brokers_nats.go b/consumers/writers/brokers/brokers_nats.go index 18bfc19c7..acab5d746 100644 --- a/consumers/writers/brokers/brokers_nats.go +++ b/consumers/writers/brokers/brokers_nats.go @@ -17,7 +17,7 @@ import ( ) const ( - AllTopic = "writers.>" + AllTopic = "writers/#" prefix = "writers" ) diff --git a/docker/.env b/docker/.env index 1a8e82a5a..385b94f8e 100644 --- a/docker/.env +++ b/docker/.env @@ -281,7 +281,7 @@ MG_GROUPS_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt} MG_CLIENTS_LOG_LEVEL=debug MG_CLIENTS_STANDALONE_ID= MG_CLIENTS_STANDALONE_TOKEN= -MG_CLIENTS_CACHE_KEY_DURATION=10m +MG_CLIENTS_CACHE_KEY_DURATION=1h MG_CLIENTS_HTTP_HOST=clients MG_CLIENTS_HTTP_PORT=9006 MG_CLIENTS_GRPC_HOST=clients diff --git a/docker/addons/postgres-writer/config.toml b/docker/addons/postgres-writer/config.toml index 0f3343bcc..f00585d71 100644 --- a/docker/addons/postgres-writer/config.toml +++ b/docker/addons/postgres-writer/config.toml @@ -2,13 +2,13 @@ # SPDX-License-Identifier: Apache-2.0 # Writers consume through the broker's stream-backed path; this file only -# selects topic filters. Use NATS-style filters here even when built with -# FluxMQ. -# To listen on all writer topics use the default value "writers.>". -# To subscribe to specific topics use values starting with "writers." and -# followed by a subtopic (e.g. ["writers..sub.topic.x", ...]). +# selects topic filters. Use slash-delimited MQTT-style filters (`+`, `#`) +# for both NATS and FluxMQ builds. +# To listen on all writer topics use the default value "writers/#". +# To subscribe to specific topics use values starting with "writers/" and +# followed by a subtopic (e.g. ["writers//sub/topic/x", ...]). ["subscriber"] -topics = ["writers.>"] +topics = ["writers/#"] [transformer] # SenML or JSON diff --git a/docker/addons/timescale-writer/config.toml b/docker/addons/timescale-writer/config.toml index 820dde6ac..30dac8cc7 100644 --- a/docker/addons/timescale-writer/config.toml +++ b/docker/addons/timescale-writer/config.toml @@ -2,10 +2,10 @@ # SPDX-License-Identifier: Apache-2.0 # Writers consume through the broker's stream-backed path; this file only -# selects topic filters. Use NATS-style filters here even when built with -# FluxMQ. -# To listen on all writer topics use the default value "writers.>". -# To subscribe to specific topics use values starting with "writers." and -# followed by a subtopic (e.g. ["writers..sub.topic.x", ...]). +# selects topic filters. Use slash-delimited MQTT-style filters (`+`, `#`) +# for both NATS and FluxMQ builds. +# To listen on all writer topics use the default value "writers/#". +# To subscribe to specific topics use values starting with "writers/" and +# followed by a subtopic (e.g. ["writers//sub/topic/x", ...]). ["subscriber"] -topics = ["writers.>"] +topics = ["writers/#"] diff --git a/docker/fluxmq/node1.yaml b/docker/fluxmq/node1.yaml index 51014dd64..726867138 100644 --- a/docker/fluxmq/node1.yaml +++ b/docker/fluxmq/node1.yaml @@ -122,7 +122,7 @@ queues: auth: url: "http://fluxmq-auth:7016" transport: "grpc" - timeout: 5s + timeout: 15s protocols: mqtt: true http: true diff --git a/docker/fluxmq/node2.yaml b/docker/fluxmq/node2.yaml index 27a541067..a63c8316c 100644 --- a/docker/fluxmq/node2.yaml +++ b/docker/fluxmq/node2.yaml @@ -119,7 +119,7 @@ queues: auth: url: "http://fluxmq-auth:7016" transport: "grpc" - timeout: 5s + timeout: 15s protocols: mqtt: true http: true diff --git a/docker/fluxmq/node3.yaml b/docker/fluxmq/node3.yaml index 75b30790d..21ea443ad 100644 --- a/docker/fluxmq/node3.yaml +++ b/docker/fluxmq/node3.yaml @@ -119,7 +119,7 @@ queues: auth: url: "http://fluxmq-auth:7016" transport: "grpc" - timeout: 5s + timeout: 15s protocols: mqtt: true http: true diff --git a/journal/service.go b/journal/service.go index a045ab44d..0321bca3d 100644 --- a/journal/service.go +++ b/journal/service.go @@ -6,13 +6,13 @@ package journal import ( "context" "fmt" - "strings" "time" "github.com/absmach/supermq" smqauthn "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/errors" svcerr "github.com/absmach/supermq/pkg/errors/service" + "github.com/absmach/supermq/pkg/messaging" ) const ( @@ -28,6 +28,7 @@ const ( var ( errSaveJournal = errors.New("failed to save journal") errHandleTelemetry = errors.New("failed to handle client telemetry") + errInvalidSubTopic = errors.New("invalid subscribe topic") ) type service struct { @@ -139,10 +140,9 @@ func (svc *service) addSubscription(ctx context.Context, journal Journal) error if err != nil { return err } - var subtopic string - topics := strings.Split(ae.topic, ".") - if len(topics) > 2 { - subtopic = topics[2] + channelID, subtopic, err := parseSubscriptionTopic(ae.topic) + if err != nil { + return err } id, err := svc.idProvider.ID() @@ -153,7 +153,7 @@ func (svc *service) addSubscription(ctx context.Context, journal Journal) error sub := ClientSubscription{ ID: id, SubscriberID: ae.subscriberID, - ChannelID: topics[1], + ChannelID: channelID, Subtopic: subtopic, ClientID: ae.clientID, } @@ -161,6 +161,17 @@ func (svc *service) addSubscription(ctx context.Context, journal Journal) error return svc.repository.AddSubscription(ctx, sub) } +func parseSubscriptionTopic(topic string) (string, string, error) { + _, channelID, subtopic, _, err := messaging.ParseSubscribeTopic(topic) + if err != nil { + return "", "", errors.Wrap(errInvalidSubTopic, err) + } + if channelID == "" { + return "", "", errInvalidSubTopic + } + return channelID, subtopic, nil +} + func (svc *service) addMqttSubscription(ctx context.Context, journal Journal) error { ae, err := toMqttSubscribeEvent(journal) if err != nil { diff --git a/pkg/events/store/store_nats.go b/pkg/events/store/store_nats.go index 08378a040..f3dd4e1c9 100644 --- a/pkg/events/store/store_nats.go +++ b/pkg/events/store/store_nats.go @@ -16,7 +16,7 @@ import ( ) // StreamAllEvents represents subject to subscribe for all the events. -const StreamAllEvents = "events.>" +const StreamAllEvents = "events/#" func init() { log.Println("The binary was build using Nats as the events store") diff --git a/pkg/messaging/brokers/brokers_fluxmq.go b/pkg/messaging/brokers/brokers_fluxmq.go index ea5d4a7a7..b7f339535 100644 --- a/pkg/messaging/brokers/brokers_fluxmq.go +++ b/pkg/messaging/brokers/brokers_fluxmq.go @@ -1,8 +1,8 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build msg_fluxmq -// +build msg_fluxmq +//go:build !msg_nats +// +build !msg_nats package brokers @@ -16,7 +16,7 @@ import ( ) // SubjectAllMessages represents subject to subscribe for all the messages. -const SubjectAllMessages = string(messaging.MsgTopicPrefix) + ".>" +const SubjectAllMessages = string(messaging.MsgTopicPrefix) + "/#" func init() { log.Println("The binary was built using FluxMQ as the message broker") diff --git a/pkg/messaging/brokers/brokers_nats.go b/pkg/messaging/brokers/brokers_nats.go index a28640770..949927469 100644 --- a/pkg/messaging/brokers/brokers_nats.go +++ b/pkg/messaging/brokers/brokers_nats.go @@ -1,8 +1,8 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !msg_fluxmq && !msg_rabbitmq && !rabbitmq -// +build !msg_fluxmq,!msg_rabbitmq,!rabbitmq +//go:build msg_nats +// +build msg_nats package brokers @@ -16,7 +16,7 @@ import ( ) // SubjectAllMessages represents subject to subscribe for all the messages. -const SubjectAllMessages = string(messaging.MsgTopicPrefix) + ".>" +const SubjectAllMessages = string(messaging.MsgTopicPrefix) + "/#" func init() { log.Println("The binary was built using NATS as the message broker") diff --git a/pkg/messaging/fluxmq/options.go b/pkg/messaging/fluxmq/options.go index 236e06ee4..906e29638 100644 --- a/pkg/messaging/fluxmq/options.go +++ b/pkg/messaging/fluxmq/options.go @@ -5,6 +5,7 @@ package fluxmq import ( "errors" + "strings" "github.com/absmach/supermq/pkg/messaging" "github.com/nats-io/nats.go/jetstream" @@ -32,9 +33,15 @@ func Prefix(prefix string) messaging.Option { return func(val any) error { switch v := val.(type) { case *publisher: - v.prefix = prefix + v.prefix = strings.TrimSpace(prefix) + if v.prefix == "" { + v.prefix = msgPrefix + } case *pubsub: - v.prefix = prefix + v.prefix = strings.TrimSpace(prefix) + if v.prefix == "" { + v.prefix = msgPrefix + } default: return ErrInvalidType } diff --git a/pkg/messaging/fluxmq/pubsub.go b/pkg/messaging/fluxmq/pubsub.go index fc1c4088d..aaee99030 100644 --- a/pkg/messaging/fluxmq/pubsub.go +++ b/pkg/messaging/fluxmq/pubsub.go @@ -95,7 +95,7 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e group := formatConsumerName(cfg.Topic, cfg.ID) opts := &fluxamqp.StreamConsumeOptions{ - QueueName: streamQueue(ps.prefix), + QueueName: ps.prefix, Filter: streamFilter(ps.prefix, cfg.Topic), ConsumerGroup: group, } @@ -118,7 +118,6 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e sub := subscription{ streamTopic: queueFilter(ps.prefix, cfg.Topic), } - if ps.directTopicIngress { // Subscribe to regular MQTT topics so that messages published directly // by MQTT clients (not through the stream queue) are also received. @@ -137,7 +136,6 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e ps.mu.Lock() ps.subscriptions[subscriptionKey(cfg.ID, cfg.Topic)] = sub ps.mu.Unlock() - return nil } diff --git a/pkg/messaging/fluxmq/topic.go b/pkg/messaging/fluxmq/topic.go index 6b8bca591..0e665daad 100644 --- a/pkg/messaging/fluxmq/topic.go +++ b/pkg/messaging/fluxmq/topic.go @@ -13,113 +13,88 @@ import ( const queuePrefix = "$queue/" -var ( - topicReplacer = strings.NewReplacer(".", "/", "*", "+", ">", "#") - nameReplacer = strings.NewReplacer( - " ", "_", - ".", "_", - "*", "_", - ">", "_", - "/", "_", - "\\", "_", - ) +var nameReplacer = strings.NewReplacer( + " ", "_", + ".", "_", + "*", "_", + ">", "_", + "/", "_", + "\\", "_", + "+", "_", + "#", "_", ) -func canonicalPrefix(prefix string) string { - prefix = strings.TrimSpace(prefix) - if prefix == "" { - return msgPrefix - } - return prefix -} - -func streamQueue(prefix string) string { - return canonicalPrefix(prefix) -} - -func brokerPath(topic string) string { - topic = strings.TrimSpace(topic) - topic = strings.TrimPrefix(topic, ".") - if topic == "" { - return "" - } - - return topicReplacer.Replace(topic) -} - func streamFilter(prefix, topic string) string { - path := filterPath(prefix, topic) - if path == "" { - return "#" - } - return path -} - -func queueFilter(prefix, topic string) string { - queue := streamQueue(prefix) - path := streamFilter(prefix, topic) - if path == "#" { - return queuePrefix + queue + "/#" - } - - return queuePrefix + queue + "/" + path -} - -func queueTopic(prefix, topic string) string { - queue := streamQueue(prefix) - path := brokerPath(topic) - if path == "" { - return queuePrefix + queue - } - - return queuePrefix + queue + "/" + path -} - -func filterPath(prefix, topic string) string { topic = strings.TrimSpace(topic) - if topic == "" || topic == ">" { + if topic == "" || topic == "#" { return "#" } - prefix = canonicalPrefix(prefix) switch { case topic == prefix: - topic = ">" - case strings.HasPrefix(topic, prefix+"."): - topic = strings.TrimPrefix(topic, prefix+".") + return "#" + case strings.HasPrefix(topic, prefix+"/"): + topic = strings.TrimPrefix(topic, prefix+"/") } - return brokerPath(topic) -} + topic = strings.TrimPrefix(strings.TrimSpace(topic), "/") + if topic == "" { + return "#" + } -func formatConsumerName(topic, id string) string { - // Consumer group names must avoid whitespace and wildcard/path separators. - topic = nameReplacer.Replace(topic) - id = nameReplacer.Replace(id) - return fmt.Sprintf("%s-%s", topic, id) + return topic } // topicFilter returns the MQTT topic filter for subscribing to regular -// (non-queued) messages. It converts a NATS-style topic to MQTT format -// with the prefix prepended. -// For example, with prefix "m" and topic "m.>", it returns "m/#". +// (non-queued) messages. It strips the prefix and re-prepends it to +// normalize the filter. func topicFilter(prefix, topic string) string { - prefix = canonicalPrefix(prefix) - path := filterPath(prefix, topic) - if path == "" || path == "#" { + topic = strings.TrimSpace(topic) + if topic == "" || topic == "#" { return prefix + "/#" } - return prefix + "/" + path + switch { + case topic == prefix: + return prefix + "/#" + case strings.HasPrefix(topic, prefix+"/"): + topic = strings.TrimPrefix(topic, prefix+"/") + } + + topic = strings.TrimPrefix(strings.TrimSpace(topic), "/") + if topic == "" || topic == "#" { + return prefix + "/#" + } + + return prefix + "/" + topic +} + +func queueFilter(prefix, topic string) string { + path := streamFilter(prefix, topic) + if path == "#" { + return queuePrefix + prefix + "/#" + } + + return queuePrefix + prefix + "/" + path +} + +func queueTopic(prefix, topic string) string { + topic = strings.TrimPrefix(strings.TrimSpace(topic), "/") + if topic == "" { + return queuePrefix + prefix + } + + return queuePrefix + prefix + "/" + topic } func parseMQTTTopic(prefix, topic string) (domainID, channelID, subtopic string, err error) { topic = strings.TrimPrefix(strings.TrimSpace(topic), "/") - prefix = canonicalPrefix(prefix) if !strings.HasPrefix(topic, prefix+"/") { return "", "", "", messaging.ErrMalformedTopic } - normalized := "/" + msgPrefix + "/" + strings.TrimPrefix(topic, prefix+"/") + // Replace the broker-specific prefix with the canonical message prefix + // so ParseSubscribeTopic can parse it. + normalized := msgPrefix + "/" + strings.TrimPrefix(topic, prefix+"/") domainID, channelID, subtopic, _, err = messaging.ParseSubscribeTopic(normalized) if err != nil { @@ -149,8 +124,15 @@ func stringHeader(headers map[string]any, key string) string { func declareStream(client *fluxamqp.Client, prefix string) error { _, err := client.DeclareStreamQueue(&fluxamqp.StreamQueueOptions{ - Name: streamQueue(prefix), + Name: prefix, Durable: true, }) return err } + +func formatConsumerName(topic, id string) string { + // Consumer group names must avoid whitespace and wildcard/path separators. + topic = nameReplacer.Replace(topic) + id = nameReplacer.Replace(id) + return fmt.Sprintf("%s-%s", topic, id) +} diff --git a/pkg/messaging/fluxmq/topic_test.go b/pkg/messaging/fluxmq/topic_test.go index b4c844680..2500c6a53 100644 --- a/pkg/messaging/fluxmq/topic_test.go +++ b/pkg/messaging/fluxmq/topic_test.go @@ -9,7 +9,7 @@ import ( ) func TestQueueTopic(t *testing.T) { - got := queueTopic("m", "domain.c.channel.subtopic") + got := queueTopic("m", "domain/c/channel/subtopic") want := "$queue/m/domain/c/channel/subtopic" if got != want { t.Fatalf("queue topic mismatch: got %q, want %q", got, want) @@ -26,25 +26,25 @@ func TestStreamFilter(t *testing.T) { { name: "all messages with prefix", prefix: "m", - topic: "m.>", + topic: "m/#", want: "#", }, { name: "all messages without explicit prefix", prefix: "writers", - topic: ">", + topic: "#", want: "#", }, { name: "specific topic filter", prefix: "writers", - topic: "writers.domain.c.channel.*", + topic: "writers/domain/c/channel/+", want: "domain/c/channel/+", }, { name: "topic without prefix", prefix: "alarms", - topic: "domain.c.channel.>", + topic: "domain/c/channel/#", want: "domain/c/channel/#", }, } @@ -60,7 +60,7 @@ func TestStreamFilter(t *testing.T) { } func TestQueueFilter(t *testing.T) { - got := queueFilter("writers", "writers.>") + got := queueFilter("writers", "writers/#") want := "$queue/writers/#" if got != want { t.Fatalf("queue filter mismatch: got %q, want %q", got, want) @@ -77,25 +77,25 @@ func TestTopicFilter(t *testing.T) { { name: "all messages with prefix", prefix: "m", - topic: "m.>", + topic: "m/#", want: "m/#", }, { name: "wildcard topic", prefix: "writers", - topic: ">", + topic: "#", want: "writers/#", }, { name: "specific topic", prefix: "m", - topic: "m.domain.c.channel.subtopic", + topic: "m/domain/c/channel/subtopic", want: "m/domain/c/channel/subtopic", }, { name: "single-level wildcard", prefix: "m", - topic: "m.domain.c.*.subtopic", + topic: "m/domain/c/+/subtopic", want: "m/domain/c/+/subtopic", }, } @@ -126,7 +126,7 @@ func TestParseMQTTTopic(t *testing.T) { topic: "m/domain/c/channel/sub/topic", domain: "domain", channel: "channel", - subtopic: "sub.topic", + subtopic: "sub/topic", }, { name: "alternate prefix without subtopic", @@ -142,7 +142,7 @@ func TestParseMQTTTopic(t *testing.T) { topic: "/alarms/domain/c/channel/critical/high", domain: "domain", channel: "channel", - subtopic: "critical.high", + subtopic: "critical/high", }, { name: "mismatched prefix", @@ -223,7 +223,7 @@ func TestParseMQTTTopicFromStreamRoutingKey(t *testing.T) { prefix: "alarms", domain: "dom", channel: "ch", - subtopic: "critical.high", + subtopic: "critical/high", }, } for _, tc := range cases { @@ -264,7 +264,7 @@ func TestStringHeader(t *testing.T) { } func TestFormatConsumerName(t *testing.T) { - got := formatConsumerName("m.domain.c.channel.>", "re/service 1") + got := formatConsumerName("m/domain/c/channel/#", "re/service 1") want := "m_domain_c_channel__-re_service_1" if got != want { t.Fatalf("consumer name mismatch: got %q, want %q", got, want) diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index 93317b4f1..c005b5d31 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -73,7 +73,7 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging. return err } - subject := fmt.Sprintf("%s.%s", pub.prefix, topic) + subject := fmt.Sprintf("%s.%s", pub.prefix, toNATSTopic(topic)) if _, err = pub.js.Publish(ctx, subject, data); err != nil { return err } diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index ad347f79b..216fec132 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -83,12 +83,13 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) // nolint:contextcheck nh := ps.natsHandler(cfg.Handler) + natsTopic := toNATSTopic(cfg.Topic) consumerConfig := jetstream.ConsumerConfig{ Name: formatConsumerName(cfg.Topic, cfg.ID), Durable: formatConsumerName(cfg.Topic, cfg.ID), Description: fmt.Sprintf("SuperMQ consumer of id %s for cfg.Topic %s", cfg.ID, cfg.Topic), DeliverPolicy: jetstream.DeliverNewPolicy, - FilterSubject: cfg.Topic, + FilterSubject: natsTopic, } if cfg.Ordered { @@ -205,6 +206,14 @@ func (ps *pubsub) handleAck(at messaging.AckType, m jetstream.Msg) { } } +// toNATSTopic converts a canonical /-separated topic with MQTT wildcards +// to NATS format using . separators and NATS wildcards. +var natsTopicReplacer = strings.NewReplacer("/", ".", "+", "*", "#", ">") + +func toNATSTopic(topic string) string { + return natsTopicReplacer.Replace(topic) +} + func formatConsumerName(topic, id string) string { // A durable name cannot contain whitespace, ., *, >, path separators (forward or backwards slash), and non-printable characters. chars := []string{ @@ -214,6 +223,8 @@ func formatConsumerName(topic, id string) string { ">", "_", "/", "_", "\\", "_", + "+", "_", + "#", "_", } topic = strings.NewReplacer(chars...).Replace(topic) diff --git a/pkg/messaging/nats/pubsub_test.go b/pkg/messaging/nats/pubsub_test.go index 99688cd1e..0a68a78ce 100644 --- a/pkg/messaging/nats/pubsub_test.go +++ b/pkg/messaging/nats/pubsub_test.go @@ -37,7 +37,7 @@ var ( func TestPublisher(t *testing.T) { subCfg := messaging.SubscriberConfig{ ID: clientID, - Topic: fmt.Sprintf("%s.>", msgPrefix), + Topic: fmt.Sprintf("%s/#", msgPrefix), Handler: handler{}, } err := pubsub.Subscribe(context.TODO(), subCfg) @@ -116,7 +116,7 @@ func TestPubsub(t *testing.T) { }{ { desc: "Subscribe to a topic with an ID", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "clientid1", errorMessage: nil, pubsub: true, @@ -124,7 +124,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Subscribe using malformed topic and ID", - topic: fmt.Sprintf("%s.>", msgPrefix), + topic: fmt.Sprintf("%s/#", msgPrefix), clientID: "clientid1", errorMessage: nil, pubsub: true, @@ -132,7 +132,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Subscribe using malformed topic and ID", - topic: fmt.Sprintf("%s.*", msgPrefix), + topic: fmt.Sprintf("%s/+", msgPrefix), clientID: "clientid1", errorMessage: nil, pubsub: true, @@ -140,7 +140,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Subscribe to the same topic with a different ID", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "clientid2", errorMessage: nil, pubsub: true, @@ -148,7 +148,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Subscribe to an already subscribed topic with an ID", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "clientid1", errorMessage: nil, pubsub: true, @@ -156,7 +156,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Unsubscribe from a topic with an ID", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "clientid1", errorMessage: nil, pubsub: false, @@ -172,7 +172,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Unsubscribe from the same topic with a different ID", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "clientidd2", errorMessage: nats.ErrNotSubscribed, pubsub: false, @@ -180,7 +180,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Unsubscribe from the same topic with a different ID not subscribed", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "clientidd3", errorMessage: nats.ErrNotSubscribed, pubsub: false, @@ -188,7 +188,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Unsubscribe from an already unsubscribed topic with an ID", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "clientid1", errorMessage: nats.ErrNotSubscribed, pubsub: false, @@ -196,7 +196,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Subscribe to a topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic), + topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic), clientID: "clientidd1", errorMessage: nil, pubsub: true, @@ -204,7 +204,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Subscribe to an already subscribed topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic), + topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic), clientID: "clientidd1", errorMessage: nil, pubsub: true, @@ -212,7 +212,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Unsubscribe from a topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic), + topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic), clientID: "clientidd1", errorMessage: nil, pubsub: false, @@ -220,7 +220,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID", - topic: fmt.Sprintf("%s.%s.%s", msgPrefix, topic, subtopic), + topic: fmt.Sprintf("%s/%s/%s", msgPrefix, topic, subtopic), clientID: "clientid1", errorMessage: nats.ErrNotSubscribed, pubsub: false, @@ -244,7 +244,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Subscribe to a topic with empty id", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "", errorMessage: nats.ErrEmptyID, pubsub: true, @@ -252,7 +252,7 @@ func TestPubsub(t *testing.T) { }, { desc: "Unsubscribe from a topic with empty id", - topic: fmt.Sprintf("%s.%s", msgPrefix, topic), + topic: fmt.Sprintf("%s/%s", msgPrefix, topic), clientID: "", errorMessage: nats.ErrEmptyID, pubsub: false, diff --git a/pkg/messaging/topics.go b/pkg/messaging/topics.go index f3609b863..2a23bffcf 100644 --- a/pkg/messaging/topics.go +++ b/pkg/messaging/topics.go @@ -23,11 +23,11 @@ const ( ) var ( - mqWildcards = "+#" - wildcards = "*>" - subtopicInvalidChars = " #+" - wildcardsReplacer = strings.NewReplacer("+", "*", "#", ">") - pathReplacer = strings.NewReplacer("/", ".") + // MQTT wildcards are the canonical wildcard characters. + mqttWildcards = "+#" + natsWildcards = "*>" + subtopicInvalidChars = " " + subtopicSep = "/" DefaultCacheConfig = CacheConfig{ NumCounters: 2e5, // 200k @@ -269,11 +269,11 @@ func ParsePublishSubtopic(subtopic string) (parseSubTopic string, err error) { return "", errors.Wrap(ErrMalformedSubtopic, err) } - if strings.ContainsAny(subtopic, subtopicInvalidChars+wildcards) { + if strings.ContainsAny(subtopic, subtopicInvalidChars+mqttWildcards+natsWildcards) { return "", ErrMalformedSubtopic } - if strings.Contains(subtopic, "..") { + if strings.Contains(subtopic, "//") { return "", ErrMalformedSubtopic } @@ -298,24 +298,21 @@ func ParseSubscribeSubtopic(subtopic string) (parseSubTopic string, err error) { return "", nil } - if strings.ContainsAny(subtopic, mqWildcards) { - subtopic = wildcardsReplacer.Replace(subtopic) - } subtopic, err = formatSubtopic(subtopic) if err != nil { return "", errors.Wrap(ErrMalformedSubtopic, err) } - if strings.ContainsAny(subtopic, subtopicInvalidChars) { + if strings.ContainsAny(subtopic, subtopicInvalidChars+natsWildcards) { return "", ErrMalformedSubtopic } - if strings.Contains(subtopic, "..") { + if strings.Contains(subtopic, "//") { return "", ErrMalformedSubtopic } - for _, elem := range strings.Split(subtopic, ".") { - if len(elem) > 1 && strings.ContainsAny(elem, wildcards) { + for _, elem := range strings.Split(subtopic, subtopicSep) { + if len(elem) > 1 && strings.ContainsAny(elem, mqttWildcards) { return "", ErrMalformedSubtopic } } @@ -323,25 +320,24 @@ func ParseSubscribeSubtopic(subtopic string) (parseSubTopic string, err error) { } func formatSubtopic(subtopic string) (string, error) { - subtopic, err := url.QueryUnescape(subtopic) + subtopic, err := url.PathUnescape(subtopic) if err != nil { return "", err } subtopic = strings.TrimPrefix(subtopic, "/") subtopic = strings.TrimSuffix(subtopic, "/") subtopic = strings.TrimSpace(subtopic) - subtopic = pathReplacer.Replace(subtopic) return subtopic, nil } func EncodeTopic(domainID string, channelID string, subtopic string) string { - return fmt.Sprintf("%s.%s", string(MsgTopicPrefix), EncodeTopicSuffix(domainID, channelID, subtopic)) + return fmt.Sprintf("%s/%s", string(MsgTopicPrefix), EncodeTopicSuffix(domainID, channelID, subtopic)) } func EncodeTopicSuffix(domainID string, channelID string, subtopic string) string { - subject := fmt.Sprintf("%s.%s.%s", domainID, string(ChannelTopicPrefix), channelID) + subject := fmt.Sprintf("%s/%s/%s", domainID, string(ChannelTopicPrefix), channelID) if subtopic != "" { - subject = fmt.Sprintf("%s.%s", subject, subtopic) + subject = fmt.Sprintf("%s/%s", subject, subtopic) } return subject } @@ -351,11 +347,7 @@ func EncodeMessageTopic(m *Message) string { } func EncodeMessageMQTTTopic(m *Message) string { - topic := fmt.Sprintf("%s/%s/%s/%s", string(MsgTopicPrefix), m.GetDomain(), string(ChannelTopicPrefix), m.GetChannel()) - if m.GetSubtopic() != "" { - topic = topic + "/" + strings.ReplaceAll(m.GetSubtopic(), ".", "/") - } - return topic + return EncodeTopic(m.GetDomain(), m.GetChannel(), m.GetSubtopic()) } func encodeAdapterTopic(domain, channel, subtopic string, topicType TopicType) string { diff --git a/pkg/messaging/topics_test.go b/pkg/messaging/topics_test.go index 979c1a408..7aa6b1d5a 100644 --- a/pkg/messaging/topics_test.go +++ b/pkg/messaging/topics_test.go @@ -65,7 +65,7 @@ var ParsePublisherTopicTestCases = []struct { topic: "/m/domain123/c/channel456/devices/temp", domainID: "domain123", channelID: "channel456", - subtopic: "devices.temp", + subtopic: "devices/temp", topicType: messaging.MessageType, err: nil, }, @@ -74,7 +74,7 @@ var ParsePublisherTopicTestCases = []struct { topic: "/m/domain123/c/channel456/devices%2Ftemp%2Fdata", domainID: "domain123", channelID: "channel456", - subtopic: "devices.temp.data", + subtopic: "devices/temp/data", topicType: messaging.MessageType, }, { @@ -82,7 +82,7 @@ var ParsePublisherTopicTestCases = []struct { topic: "/m/domain/c/channel/extra/extra2", domainID: "domain", channelID: "channel", - subtopic: "extra.extra2", + subtopic: "extra/extra2", topicType: messaging.MessageType, }, { @@ -98,7 +98,7 @@ var ParsePublisherTopicTestCases = []struct { topic: "/m/domain123/c/channel456/devices/temp/", domainID: "domain123", channelID: "channel456", - subtopic: "devices.temp", + subtopic: "devices/temp", topicType: messaging.MessageType, }, { @@ -253,7 +253,7 @@ var ParseSubscribeTestCases = []struct { topic: "/m/domain123/c/channel456/devices/temp", domainID: "domain123", channelID: "channel456", - subtopic: "devices.temp", + subtopic: "devices/temp", topicType: messaging.MessageType, }, { @@ -261,7 +261,7 @@ var ParseSubscribeTestCases = []struct { topic: "/m/domain123/c/channel456/devices/+/temp/#", domainID: "domain123", channelID: "channel456", - subtopic: "devices.*.temp.>", + subtopic: "devices/+/temp/#", topicType: messaging.MessageType, }, { @@ -277,7 +277,7 @@ var ParseSubscribeTestCases = []struct { topic: "/m/domain123/c/channel456/devices/temp/", domainID: "domain123", channelID: "channel456", - subtopic: "devices.temp", + subtopic: "devices/temp", topicType: messaging.MessageType, }, { @@ -323,11 +323,11 @@ var ParseSubscribeTestCases = []struct { err: messaging.ErrMalformedTopic, }, { - desc: "invalid domain name m/domain*123/c/channel456/devices/+/temp/#", + desc: "valid domain with wildcards m/domain*123/c/channel456/devices/+/temp/#", topic: "m/domain*123/c/channel456/devices/+/temp/#", domainID: "domain*123", channelID: "channel456", - subtopic: "devices.*.temp.>", + subtopic: "devices/+/temp/#", topicType: messaging.MessageType, }, { @@ -430,15 +430,15 @@ func TestEncodeTopic(t *testing.T) { desc: "with subtopic", domainID: "domain1", channelID: "chan1", - subtopic: "dev.sensor.temp", - expected: "m.domain1.c.chan1.dev.sensor.temp", + subtopic: "dev/sensor/temp", + expected: "m/domain1/c/chan1/dev/sensor/temp", }, { desc: "without subtopic", domainID: "domain1", channelID: "chan1", subtopic: "", - expected: "m.domain1.c.chan1", + expected: "m/domain1/c/chan1", }, } @@ -462,15 +462,15 @@ func TestEncodeTopicSuffix(t *testing.T) { desc: "with subtopic", domainID: "domain1", channelID: "chan1", - subtopic: "dev.sensor.temp", - expected: "domain1.c.chan1.dev.sensor.temp", + subtopic: "dev/sensor/temp", + expected: "domain1/c/chan1/dev/sensor/temp", }, { desc: "without subtopic", domainID: "domain1", channelID: "chan1", subtopic: "", - expected: "domain1.c.chan1", + expected: "domain1/c/chan1", }, } @@ -493,9 +493,9 @@ func TestMessage_EncodeTopicSuffix(t *testing.T) { message: &messaging.Message{ Domain: "domainX", Channel: "chanX", - Subtopic: "device.123.status", + Subtopic: "device/123/status", }, - expected: "domainX.c.chanX.device.123.status", + expected: "domainX/c/chanX/device/123/status", }, { desc: "without subtopic", @@ -503,7 +503,7 @@ func TestMessage_EncodeTopicSuffix(t *testing.T) { Domain: "domainY", Channel: "chanY", }, - expected: "domainY.c.chanY", + expected: "domainY/c/chanY", }, } @@ -526,7 +526,7 @@ func TestMessage_EncodeToMQTTTopic(t *testing.T) { message: &messaging.Message{ Domain: "domainA", Channel: "chanA", - Subtopic: "dev.1.temp", + Subtopic: "dev/1/temp", }, expected: "m/domainA/c/chanA/dev/1/temp", }, diff --git a/pkg/sdk/message.go b/pkg/sdk/message.go index 04e176dc0..b03d23bdc 100644 --- a/pkg/sdk/message.go +++ b/pkg/sdk/message.go @@ -24,13 +24,12 @@ type publishRequest struct { } func (sdk mgSDK) SendMessage(ctx context.Context, domainID, topic, msg, secret string) errors.SDKError { - chanNameParts := strings.SplitN(topic, ".", channelParts) + chanNameParts := strings.SplitN(topic, "/", channelParts) chanID := chanNameParts[0] brokerTopic := fmt.Sprintf("m/%s/c/%s", domainID, chanID) if len(chanNameParts) == channelParts { - brokerTopic = fmt.Sprintf("%s/%s", brokerTopic, strings.ReplaceAll(chanNameParts[1], ".", "/")) + brokerTopic = fmt.Sprintf("%s/%s", brokerTopic, chanNameParts[1]) } - data, err := json.Marshal(publishRequest{ Topic: brokerTopic, Payload: []byte(msg), @@ -40,11 +39,11 @@ func (sdk mgSDK) SendMessage(ctx context.Context, domainID, topic, msg, secret s } headers := map[string]string{ - "X-FluxMQ-Password": secret, + "X-FluxMQ-Username": domainID, } reqURL := fmt.Sprintf("%s/publish", sdk.httpAdapterURL) - _, _, sdkErr := sdk.processRequest(ctx, http.MethodPost, reqURL, "", data, headers, http.StatusOK) + _, _, sdkErr := sdk.processRequest(ctx, http.MethodPost, reqURL, secret, data, headers, http.StatusOK) return sdkErr } diff --git a/pkg/sdk/message_test.go b/pkg/sdk/message_test.go index 51565480e..a79bbd5eb 100644 --- a/pkg/sdk/message_test.go +++ b/pkg/sdk/message_test.go @@ -29,8 +29,9 @@ func setupFluxMQ(secret string, expectedTopic ...string) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc("POST /publish", func(w http.ResponseWriter, r *http.Request) { - password := r.Header.Get("X-FluxMQ-Password") - if password == "" || password != secret { + username := r.Header.Get("X-FluxMQ-Username") + auth := r.Header.Get("Authorization") + if username == "" || auth == "" || auth != "Bearer "+secret { http.Error(w, "unauthorized", http.StatusUnauthorized) return } @@ -94,7 +95,7 @@ func TestSendMessage(t *testing.T) { }, { desc: "publish message with subtopic", - topic: "channelID.sub.topic", + topic: "channelID/sub/topic", domainID: "domainID", wantTopic: "m/domainID/c/channelID/sub/topic", msg: `[{"n":"current","t":-1,"v":1.6}]`, diff --git a/pkg/sdk/messages.go b/pkg/sdk/messages.go index a30b30f6f..28fc7e0be 100644 --- a/pkg/sdk/messages.go +++ b/pkg/sdk/messages.go @@ -16,7 +16,7 @@ import ( ) func (sdk mgSDK) ReadMessages(ctx context.Context, pm MessagePageMetadata, chanName, domainID, token string) (MessagesPage, errors.SDKError) { - chanNameParts := strings.SplitN(chanName, ".", channelParts) + chanNameParts := strings.SplitN(chanName, "/", channelParts) chanID := chanNameParts[0] subtopicPart := "" if len(chanNameParts) == channelParts { diff --git a/pkg/sdk/messages_test.go b/pkg/sdk/messages_test.go index afaa41450..637ec6184 100644 --- a/pkg/sdk/messages_test.go +++ b/pkg/sdk/messages_test.go @@ -100,7 +100,7 @@ func TestReadMessages(t *testing.T) { { desc: "read messages successfully with subtopic", token: validToken, - chanName: channelID + ".subtopic", + chanName: channelID + "/subtopic", domainID: validID, messagePageMeta: sdk.MessagePageMetadata{ PageMetadata: sdk.PageMetadata{ diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index 28a4e0292..15a51271b 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -1262,7 +1262,7 @@ type SDK interface { // example: // ctx := context.Background() // msg := '[{"bn":"some-base-name:","bt":1.276020076001e+09, "bu":"A","bver":5, "n":"voltage","u":"V","v":120.1}, {"n":"current","t":-5,"v":1.2}, {"n":"current","t":-4,"v":1.3}]' - // err := sdk.SendMessage(ctx, "domainID", "topic", msg, "clientSecret") + // err := sdk.SendMessage(ctx, "domainID", "76cc9425-9df0-4b53-99b8-8dabbd3444fc/test", msg, "clientSecret") // fmt.Println(err) SendMessage(ctx context.Context, domainID, topic, msg, secret string) smqerrors.SDKError diff --git a/pkg/transformers/json/README.md b/pkg/transformers/json/README.md index a70c01b02..088d2f304 100644 --- a/pkg/transformers/json/README.md +++ b/pkg/transformers/json/README.md @@ -50,5 +50,5 @@ the message format is `myFormat`. It can be any valid subtopic name, JSON transf Having a message format in the subtopic means that the subscriber has an option to subscribe to only one message format. This is a nice feature because message subscribers know what's the expected format of the message so that they can process it. If the message format is not important, wildcard subtopic can always be used to subscribe to any message format: ``` -http://localhost:8185/m//c//home/temperature/* +http://localhost:8185/m//c//home/temperature/# ``` diff --git a/pkg/transformers/json/transformer.go b/pkg/transformers/json/transformer.go index 2dc232fe6..13f5e3655 100644 --- a/pkg/transformers/json/transformer.go +++ b/pkg/transformers/json/transformer.go @@ -61,7 +61,7 @@ func (ts *transformerService) Transform(msg *messaging.Message) (any, error) { return nil, errors.Wrap(ErrTransform, errUnknownFormat) } - subs := strings.Split(ret.Subtopic, ".") + subs := strings.Split(ret.Subtopic, "/") if len(subs) == 0 { return nil, errors.Wrap(ErrTransform, errUnknownFormat) } diff --git a/re/README.md b/re/README.md index 2bd52c3ee..4ea72c83e 100644 --- a/re/README.md +++ b/re/README.md @@ -100,7 +100,7 @@ The service is configured using the following environment variables (values show - **Rule execution**: Runs Lua or Go scripts for incoming messages. - **Multiple outputs**: Channels, alarms, email, SenML writers, remote PostgreSQL, and Slack outputs. - **Scheduling**: Runs rules at specific times with recurring intervals. -- **Filtering and matching**: Input channel filtering and NATS-style topic matching (`*`, `>`). +- **Filtering and matching**: Input channel filtering and MQTT-style topic matching (`+`, `#`). - **Observability**: `/metrics` Prometheus endpoint and Jaeger tracing support. - **Payload limit**: Messages over 100 kB are rejected for processing. @@ -110,7 +110,7 @@ The service is configured using the following environment variables (values show 1. The service subscribes to all internal broker messages. 2. For each message, it lists enabled rules for the same domain and input channel. -3. It matches the rule `input_topic` against the message subtopic using NATS-style wildcards. +3. It matches the rule `input_topic` against the message subtopic using MQTT-style wildcards. 4. The rule logic (Lua or Go) is executed and the result is passed to configured outputs. ### Message payloads @@ -154,6 +154,8 @@ Supported output types (`outputs.OutputType`) and their fields: | `save_remote_pg` | `host`, `port`, `user`, `password`, `database`, `table`, `mapping` | `mapping` is a Go template that must render a JSON object. | | `slack` | `token`, `channel_id`, `message` | `message` is a Go template. | +For `channels` output, `topic` is a slash-delimited subtopic (for example, `alerts/high-temp`). + Templates receive a `Message` (the incoming message) and a `Result` (the script output) value. ## Data model @@ -174,7 +176,7 @@ Defined in `re/postgres/init.go`: | `updated_at` | `TIMESTAMP` | Last update timestamp | | `updated_by` | `VARCHAR(254)` | Last updater user ID | | `input_channel` | `VARCHAR(36)` | Input channel ID | -| `input_topic` | `TEXT` | Input topic (supports wildcards) | +| `input_topic` | `TEXT` | Input topic (supports `+` and `#` wildcards) | | `outputs` | `JSONB` | Output definitions | | `status` | `SMALLINT` | 0 = enabled, 1 = disabled, 2 = deleted | | `logic_type` | `SMALLINT` | 0 = Lua, 1 = Go | @@ -258,7 +260,7 @@ curl -X POST http://localhost:9008//rules \ -d '{ "name": "High Temperature Alert", "input_channel": "sensors", - "input_topic": "temperature.*", + "input_topic": "temperature/+", "logic": { "type": 0, "value": "if message.payload.t > 30 then return {measurement=\"temperature\", value=tostring(message.payload.t), unit=\"C\", threshold=\"30\", cause=\"temp high\", severity=90} end" diff --git a/re/handlers.go b/re/handlers.go index a019bc13b..e823c8a8d 100644 --- a/re/handlers.go +++ b/re/handlers.go @@ -48,7 +48,7 @@ func (re *re) Handle(msg *messaging.Message) error { return err } for _, r := range page.Rules { - if matchSubject(msg.Subtopic, r.InputTopic) { + if matchTopic(msg.Subtopic, r.InputTopic) { go func(ctx context.Context) { re.runInfo <- re.process(ctx, r, msg) }(ctx) @@ -58,20 +58,21 @@ func (re *re) Handle(msg *messaging.Message) error { return nil } -// Match NATS subject to support wildcards. -func matchSubject(published, subscribed string) bool { - p := strings.Split(published, ".") - s := strings.Split(subscribed, ".") +// matchTopic matches a published subtopic against a subscription pattern +// using MQTT-style wildcards: + (single level) and # (multi-level). +func matchTopic(published, subscribed string) bool { + p := strings.Split(published, "/") + s := strings.Split(subscribed, "/") n := len(p) for i := range s { - if s[i] == ">" { + if s[i] == "#" { return true } if i >= n { return false } - if s[i] != "*" && p[i] != s[i] { + if s[i] != "+" && p[i] != s[i] { return false } } diff --git a/re/postgres/init.go b/re/postgres/init.go index f7ad8191d..1122351d4 100644 --- a/re/postgres/init.go +++ b/re/postgres/init.go @@ -62,13 +62,77 @@ func Migration() (*migrate.MemoryMigrationSource, error) { Id: "rules_03", Up: []string{ `UPDATE rules - SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'ui') || jsonb_build_object('flow', metadata->'ui') - WHERE metadata ? 'ui' AND jsonb_typeof(metadata->'ui') = 'string'`, + SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'ui') || jsonb_build_object('flow', metadata->'ui') + WHERE metadata ? 'ui' AND jsonb_typeof(metadata->'ui') = 'string'`, }, Down: []string{ `UPDATE rules - SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'flow') || jsonb_build_object('ui', metadata->'flow') - WHERE metadata ? 'flow' AND jsonb_typeof(metadata->'flow') = 'string'`, + SET metadata = (COALESCE(metadata, '{}'::jsonb) - 'flow') || jsonb_build_object('ui', metadata->'flow') + WHERE metadata ? 'flow' AND jsonb_typeof(metadata->'flow') = 'string'`, + }, + }, + { + Id: "rules_04", + Up: []string{ + // Canonicalize legacy rule topics from dot/NATS wildcards + // to slash/MQTT wildcards. + `UPDATE rules + SET input_topic = REPLACE(REPLACE(REPLACE(input_topic, '>', '#'), '*', '+'), '.', '/') + WHERE input_topic IS NOT NULL AND input_topic <> ''`, + }, + Down: []string{ + `UPDATE rules + SET input_topic = REPLACE(REPLACE(REPLACE(input_topic, '#', '>'), '+', '*'), '/', '.') + WHERE input_topic IS NOT NULL AND input_topic <> ''`, + }, + }, + { + Id: "rules_05", + Up: []string{ + // Canonicalize channel output topics in rule outputs JSON: + // dot/NATS wildcards -> slash/MQTT wildcards. + `UPDATE rules AS r + SET outputs = COALESCE(( + SELECT jsonb_agg( + CASE + WHEN out_elem.elem->>'type' = 'channels' + AND out_elem.elem ? 'topic' + AND jsonb_typeof(out_elem.elem->'topic') = 'string' + THEN jsonb_set( + out_elem.elem, + '{topic}', + to_jsonb(REPLACE(REPLACE(REPLACE(out_elem.elem->>'topic', '>', '#'), '*', '+'), '.', '/')), + false + ) + ELSE out_elem.elem + END + ORDER BY out_elem.ord + ) + FROM jsonb_array_elements(r.outputs) WITH ORDINALITY AS out_elem(elem, ord) + ), '[]'::jsonb) + WHERE jsonb_typeof(r.outputs) = 'array'`, + }, + Down: []string{ + `UPDATE rules AS r + SET outputs = COALESCE(( + SELECT jsonb_agg( + CASE + WHEN out_elem.elem->>'type' = 'channels' + AND out_elem.elem ? 'topic' + AND jsonb_typeof(out_elem.elem->'topic') = 'string' + THEN jsonb_set( + out_elem.elem, + '{topic}', + to_jsonb(REPLACE(REPLACE(REPLACE(out_elem.elem->>'topic', '#', '>'), '+', '*'), '/', '.')), + false + ) + ELSE out_elem.elem + END + ORDER BY out_elem.ord + ) + FROM jsonb_array_elements(r.outputs) WITH ORDINALITY AS out_elem(elem, ord) + ), '[]'::jsonb) + WHERE jsonb_typeof(r.outputs) = 'array'`, }, }, }, diff --git a/reports/README.md b/reports/README.md index 5c041aaef..7e00c475a 100644 --- a/reports/README.md +++ b/reports/README.md @@ -215,6 +215,7 @@ The Reports service supports the following operations: List filters: `offset`, `limit`, `status`, `name`, `order` (`name`, `created_at`, `updated_at`), and `dir` (`asc`, `desc`). Time ranges use relative expressions parsed by `pkg/reltime`, such as `now()` or `now()-24h` (units: `s`, `m`, `h`, `d`, `w`). Aggregation intervals use Go duration strings like `15m` or `1h`. File output formats are `pdf` and `csv`. +When metric `subtopic` is used, provide it in slash-delimited form (for example, `sensor/temp`). ### Example: Generate a report diff --git a/reports/postgres/init.go b/reports/postgres/init.go index 15329caf6..2eecf30c1 100644 --- a/reports/postgres/init.go +++ b/reports/postgres/init.go @@ -54,6 +54,53 @@ func Migration() (*migrate.MemoryMigrationSource, error) { `ALTER TABLE report_config DROP COLUMN report_template;`, }, }, + { + Id: "reports_03", + Up: []string{ + // Canonicalize legacy report metric subtopics from dot/NATS wildcards + // to slash/MQTT wildcards. + `UPDATE report_config AS rc + SET metrics = COALESCE(( + SELECT jsonb_agg( + CASE + WHEN metric.elem ? 'subtopic' + AND jsonb_typeof(metric.elem->'subtopic') = 'string' + THEN jsonb_set( + metric.elem, + '{subtopic}', + to_jsonb(REPLACE(REPLACE(REPLACE(metric.elem->>'subtopic', '>', '#'), '*', '+'), '.', '/')), + false + ) + ELSE metric.elem + END + ORDER BY metric.ord + ) + FROM jsonb_array_elements(rc.metrics) WITH ORDINALITY AS metric(elem, ord) + ), '[]'::jsonb) + WHERE jsonb_typeof(rc.metrics) = 'array'`, + }, + Down: []string{ + `UPDATE report_config AS rc + SET metrics = COALESCE(( + SELECT jsonb_agg( + CASE + WHEN metric.elem ? 'subtopic' + AND jsonb_typeof(metric.elem->'subtopic') = 'string' + THEN jsonb_set( + metric.elem, + '{subtopic}', + to_jsonb(REPLACE(REPLACE(REPLACE(metric.elem->>'subtopic', '#', '>'), '+', '*'), '/', '.')), + false + ) + ELSE metric.elem + END + ORDER BY metric.ord + ) + FROM jsonb_array_elements(rc.metrics) WITH ORDINALITY AS metric(elem, ord) + ), '[]'::jsonb) + WHERE jsonb_typeof(rc.metrics) = 'array'`, + }, + }, }, }