NOISSUE - Update RE message handling (#116)

* Refactor RE

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Use Publisher only where possible

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Simplify message payload

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update Lua code and message preprocessing

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix JSON parsing

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update SMQ dependency

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Remove case that is never going to happen

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Remove SenML part

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Reorder functions

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Updarte RE setup AND Timescale writer

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Use NATS instead of brokers

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix pubsub

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix RE publishers

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update JS config description

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix configs and remove unused code

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix linter remarks

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

---------

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
Dušan Borovčanin
2025-04-15 20:12:27 +02:00
committed by GitHub
parent b3e2f41194
commit 08d727bf6e
15 changed files with 390 additions and 339 deletions
+61 -33
View File
@@ -21,7 +21,6 @@ import (
"github.com/absmach/magistrala/re/middleware"
repg "github.com/absmach/magistrala/re/postgres"
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
smqlog "github.com/absmach/supermq/logger"
authnsvc "github.com/absmach/supermq/pkg/authn/authsvc"
mgauthz "github.com/absmach/supermq/pkg/authz"
@@ -29,7 +28,6 @@ import (
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
"github.com/absmach/supermq/pkg/messaging/nats"
pgclient "github.com/absmach/supermq/pkg/postgres"
@@ -38,12 +36,15 @@ import (
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/go-chi/chi/v5"
"github.com/nats-io/nats.go/jetstream"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
const (
svcName = "rules_engine"
svcName = "rules_engine"
channelsTopic = "channels.>"
envPrefixDB = "MG_RE_DB_"
envPrefixHTTP = "MG_RE_HTTP_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
@@ -51,11 +52,6 @@ const (
defSvcHTTPPort = "9008"
)
var (
alarmsPrefix = "alarms"
writersPrefix = "writers"
)
type config struct {
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
@@ -65,10 +61,32 @@ type config struct {
CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ConfigPath string `env:"MG_RE_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
}
const (
writersCfgName = "writers"
alarmsCfgName = "alarms"
alarmsPrefix = "alarms"
writersPrefix = "writers"
)
var (
writersSubjects = []string{"writers.>"}
alarmsSubjects = []string{"alarms.>"}
)
var jsStreamConfig = jetstream.StreamConfig{
Retention: jetstream.LimitsPolicy,
Description: "SuperMQ Rules Engine stream for handling internal messages",
MaxMsgsPerSubject: 1e6,
MaxAge: time.Hour * 24,
MaxMsgSize: 1024 * 1024,
Discard: jetstream.DiscardOld,
Storage: jetstream.FileStorage,
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
@@ -131,7 +149,7 @@ func main() {
}()
tracer := tp.Tracer(svcName)
rePubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
rePubSub, err := nats.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker for rePubSub: %s", err))
exitCode = 1
@@ -139,21 +157,27 @@ func main() {
}
defer rePubSub.Close()
writersPubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger, nats.Prefix(writersPrefix))
writersCfg := jsStreamConfig
writersCfg.Name = writersCfgName
writersCfg.Subjects = writersSubjects
writersPub, err := nats.NewPublisher(ctx, cfg.BrokerURL, nats.JSStreamConfig(writersCfg), nats.Prefix(writersPrefix))
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker for writersPubSub: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker for writers publisher: %s", err))
exitCode = 1
return
}
defer writersPubSub.Close()
defer writersPub.Close()
alarmsPubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger, nats.Prefix(alarmsPrefix))
alarmsCfg := jsStreamConfig
alarmsCfg.Name = alarmsCfgName
alarmsCfg.Subjects = alarmsSubjects
alarmsPub, err := nats.NewPublisher(ctx, cfg.BrokerURL, nats.JSStreamConfig(alarmsCfg), nats.Prefix(alarmsPrefix))
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker for alarmsPubSub: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker for alarms publisher: %s", err))
exitCode = 1
return
}
defer alarmsPubSub.Close()
defer alarmsPub.Close()
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
@@ -163,17 +187,8 @@ func main() {
}
rePubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, rePubSub)
writersPubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, writersPubSub)
alarmsPubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, alarmsPubSub)
// Setup new redis cache client
// cacheclient, err := redisclient.Connect(cfg.CacheURL)
// if err != nil {
// logger.Error(err.Error())
// exitCode = 1
// return
// }
// defer cacheclient.Close()
writersPub = brokerstracing.NewPublisher(httpServerConfig, tracer, writersPub)
alarmsPub = brokerstracing.NewPublisher(httpServerConfig, tracer, alarmsPub)
grpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
@@ -200,18 +215,31 @@ func main() {
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())
database := pgclient.NewDatabase(db, dbConfig, tracer)
svc, err := newService(ctx, database, rePubSub, writersPubSub, alarmsPubSub, authz, cfg.ESURL, tracer, ec, logger)
svc, err := newService(ctx, database, rePubSub, writersPub, alarmsPub, authz, cfg.ESURL, tracer, ec, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
return
}
if err = consumers.Start(ctx, svcName, rePubSub, svc, cfg.ConfigPath, logger); err != nil {
logger.Error(fmt.Sprintf("failed to create Rule Engine: %s", err))
subCfg := messaging.SubscriberConfig{
ID: svcName,
Topic: channelsTopic,
DeliveryPolicy: messaging.DeliverAllPolicy,
Handler: svc,
}
if err := rePubSub.Subscribe(ctx, subCfg); err != nil {
logger.Error(fmt.Sprintf("failed to subscribe to internal message broker: %s", err))
exitCode = 1
return
}
go func() {
for {
err := <-svc.Errors()
logger.Warn("Error handling rule", slog.String("error", err.Error()))
}
}()
mux := chi.NewRouter()
httpSvc := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, authn, mux, logger, cfg.InstanceID), logger)
@@ -240,7 +268,7 @@ func main() {
}
}
func newService(ctx context.Context, db pgclient.Database, rePubSub messaging.PubSub, writersPubSub messaging.PubSub, alarmsPubSub messaging.PubSub, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) {
func newService(ctx context.Context, db pgclient.Database, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) {
repo := repg.NewRepository(db)
idp := uuid.New()
@@ -250,7 +278,7 @@ func newService(ctx context.Context, db pgclient.Database, rePubSub messaging.Pu
}
// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
csvc := re.NewService(repo, idp, rePubSub, writersPubSub, alarmsPubSub, re.NewTicker(time.Minute), emailerClient)
csvc := re.NewService(repo, idp, rePubSub, writersPub, alarmsPub, re.NewTicker(time.Minute), emailerClient)
csvc = middleware.LoggingMiddleware(csvc, logger)
return csvc, nil
+2 -2
View File
@@ -4,8 +4,8 @@
# To listen all messsage broker subjects use default value "writers.>".
# To subscribe to specific subjects use values starting by "writers." and
# followed by a subtopic (e.g ["writers.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["writers.>"]
["subscriber"]
subjects = ["writers.>"]
[transformer]
# SenML or JSON
+2 -2
View File
@@ -4,5 +4,5 @@
# To listen all messsage broker subjects use default value "writers.>".
# To subscribe to specific subjects use values starting by "writers." and
# followed by a subtopic (e.g ["writers.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["writers.>"]
["subscriber"]
subjects = ["writers.>"]
@@ -63,4 +63,4 @@ services:
networks:
- magistrala-base-net
volumes:
- ./config.toml:/config.toml
- ./addons/timescale-writer/config.toml:${MG_TIMESCALE_WRITER_CONFIG_PATH}
+2 -2
View File
@@ -4,5 +4,5 @@
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[subscriber]
subjects = ["channels.>"]
-1
View File
@@ -214,7 +214,6 @@ services:
target: /auth-grpc-server-ca${SMQ_AUTH_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
- ./re_config.toml:/config.toml
alarms-db:
image: postgres:16.2-alpine
-11
View File
@@ -1,11 +0,0 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[transformer]
format = ""
+2 -2
View File
@@ -5,7 +5,7 @@ go 1.24.2
require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/supermq v0.16.1-0.20250411155830-602025b48bc0
github.com/absmach/supermq v0.16.1-0.20250415080928-62f45991404c
github.com/authzed/authzed-go v1.3.1-0.20250320210445-0cde0d8c71e2
github.com/authzed/grpcutil v0.0.0-20250221190651-1985b19b35b8
github.com/caarlos0/env/v11 v11.3.1
@@ -93,7 +93,7 @@ require (
github.com/moby/sys/user v0.3.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats.go v1.41.0
github.com/nats-io/nats.go v1.41.1
github.com/nats-io/nkeys v0.4.10 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
+4 -4
View File
@@ -28,8 +28,8 @@ github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.7 h1:XLvpw0qxbP2QhOz7KLM2ZRar+vSCpSG/0o0kEvWx3No=
github.com/absmach/senml v1.0.7/go.mod h1:3bRIiNc8hq7l3auMs8gQrpsM5hHy7iDuiLILrf/+MfA=
github.com/absmach/supermq v0.16.1-0.20250411155830-602025b48bc0 h1:vPTeIjQPCgDCMGSA9pICZNcTNpz8fRl5CBlpVewMCwY=
github.com/absmach/supermq v0.16.1-0.20250411155830-602025b48bc0/go.mod h1:dJqO3luvt+zLVuDhxOdsRRCuv945F86Nf1BXxFro+nU=
github.com/absmach/supermq v0.16.1-0.20250415080928-62f45991404c h1:OjaoWbaRFJX42s1xyXxevuODnEyK7l2WryEfho8PWPI=
github.com/absmach/supermq v0.16.1-0.20250415080928-62f45991404c/go.mod h1:H71WVAlZK8ljDQWkzih6iQJ2ZT/IyZUILfAxLQBzLqo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -341,8 +341,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/nats.go v1.41.0 h1:PzxEva7fflkd+n87OtQTXqCTyLfIIMFJBpyccHLE2Ko=
github.com/nats-io/nats.go v1.41.0/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo=
github.com/nats-io/nats.go v1.41.1 h1:lCc/i5x7nqXbspxtmXaV4hRguMPHqE/kYltG9knrCdU=
github.com/nats-io/nats.go v1.41.1/go.mod h1:mzHiutcAdZrg6WLfYVKXGseqqow2fWmwlTEUOHsI4jY=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+60
View File
@@ -0,0 +1,60 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re
import (
"context"
"encoding/json"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers/senml"
lua "github.com/yuin/gopher-lua"
)
func (re *re) save(ctx context.Context, original *messaging.Message) lua.LGFunction {
return func(l *lua.LState) int {
table := l.ToTable(1)
val := convertLua(table)
data, err := json.Marshal(val)
if err != nil {
return 0
}
var message []senml.Message
if err := json.Unmarshal(data, &message); err != nil {
return 0
}
m := &messaging.Message{
Domain: original.Domain,
Publisher: original.Publisher,
Created: original.Created,
Channel: original.Channel,
Subtopic: original.Subtopic,
Protocol: original.Protocol,
Payload: data,
}
if err := re.writersPub.Publish(ctx, original.Channel, m); err != nil {
return 0
}
return 1
}
}
func (re *re) sendEmail(L *lua.LState) int {
recipientsTable := L.ToTable(1)
subject := L.ToString(2)
content := L.ToString(3)
var recipients []string
recipientsTable.ForEach(func(_, value lua.LValue) {
if str, ok := value.(lua.LString); ok {
recipients = append(recipients, string(str))
}
})
if err := re.email.SendEmailNotification(recipients, "", subject, "", "", content, ""); err != nil {
return 0
}
return 1
}
+134
View File
@@ -0,0 +1,134 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re
import (
"encoding/json"
"github.com/absmach/supermq/pkg/messaging"
"github.com/vadv/gopher-lua-libs/argparse"
"github.com/vadv/gopher-lua-libs/base64"
"github.com/vadv/gopher-lua-libs/crypto"
"github.com/vadv/gopher-lua-libs/db"
"github.com/vadv/gopher-lua-libs/filepath"
"github.com/vadv/gopher-lua-libs/ioutil"
luajson "github.com/vadv/gopher-lua-libs/json"
"github.com/vadv/gopher-lua-libs/regexp"
"github.com/vadv/gopher-lua-libs/storage"
"github.com/vadv/gopher-lua-libs/strings"
luatime "github.com/vadv/gopher-lua-libs/time"
"github.com/vadv/gopher-lua-libs/yaml"
lua "github.com/yuin/gopher-lua"
)
const payloadKey = "payload"
func preload(l *lua.LState) {
db.Preload(l)
ioutil.Preload(l)
luajson.Preload(l)
yaml.Preload(l)
crypto.Preload(l)
regexp.Preload(l)
luatime.Preload(l)
storage.Preload(l)
base64.Preload(l)
argparse.Preload(l)
strings.Preload(l)
filepath.Preload(l)
}
func prepareMsg(l *lua.LState, msg *messaging.Message) lua.LValue {
message := l.NewTable()
message.RawSetString("domain", lua.LString(msg.Domain))
message.RawSetString("channel", lua.LString(msg.Channel))
message.RawSetString("subtopic", lua.LString(msg.Subtopic))
message.RawSetString("publisher", lua.LString(msg.Publisher))
message.RawSetString("protocol", lua.LString(msg.Protocol))
message.RawSetString("created", lua.LNumber(msg.Created))
var payload interface{}
if err := json.Unmarshal(msg.GetPayload(), &payload); err != nil {
pld := l.NewTable()
// If message is not JSON, set binary payload and exit.
for i, b := range msg.Payload {
// Lua tables are 1-indexed.
pld.Insert(i+1, lua.LNumber(b))
}
message.RawSetString(payloadKey, pld)
return message
}
// Payload is JSON, set the correct value.
message.RawSetString(payloadKey, traverseJson(l, payload))
return message
}
func traverseJson(l *lua.LState, value interface{}) lua.LValue {
switch val := value.(type) {
case string:
return lua.LString(val)
case float64:
return lua.LNumber(val)
case int:
return lua.LNumber(float64(val))
case json.Number:
if num, err := val.Float64(); err != nil {
return lua.LNumber(num)
}
return lua.LNil
case bool:
return lua.LBool(val)
case []interface{}:
t := l.NewTable()
for i, j := range val {
t.RawSetInt(i+1, traverseJson(l, j))
}
return t
case map[string]interface{}:
t := l.NewTable()
for k, v := range val {
t.RawSetString(k, traverseJson(l, v))
}
return t
default:
return lua.LNil
}
}
func convertLua(lv lua.LValue) interface{} {
switch v := lv.(type) {
case *lua.LTable:
isArray := true
v.ForEach(func(key, value lua.LValue) {
if key.Type() != lua.LTNumber {
isArray = false
}
})
if isArray {
arr := []interface{}{}
v.ForEach(func(key, value lua.LValue) {
arr = append(arr, convertLua(value))
})
return arr
}
obj := map[string]interface{}{}
v.ForEach(func(key, value lua.LValue) {
obj[key.String()] = convertLua(value)
})
return obj
case lua.LString:
return string(v)
case lua.LNumber:
return float64(v)
case lua.LBool:
return bool(v)
case *lua.LNilType:
return nil
default:
return v.String()
}
}
+9 -5
View File
@@ -175,23 +175,27 @@ func (lm *loggingMiddleware) StartScheduler(ctx context.Context) (err error) {
return lm.svc.StartScheduler(ctx)
}
func (lm *loggingMiddleware) ConsumeAsync(ctx context.Context, msgs interface{}) {
func (lm *loggingMiddleware) Handle(msg *messaging.Message) error {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
}
if m, ok := msgs.(*messaging.Message); ok {
if msg != nil {
args = append(args,
slog.String("channel", m.Channel),
slog.String("payload_size", fmt.Sprintf("%d", len(m.Payload))),
slog.String("channel", msg.Channel),
slog.String("payload_size", fmt.Sprintf("%d", len(msg.Payload))),
)
}
lm.logger.Info("Message consumption completed", args...)
}(time.Now())
lm.svc.ConsumeAsync(ctx, msgs)
return lm.svc.Handle(msg)
}
func (lm *loggingMiddleware) Errors() <-chan error {
return lm.svc.Errors()
}
func (lm *loggingMiddleware) Cancel() error {
return lm.Cancel()
}
+72 -17
View File
@@ -12,6 +12,7 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/messaging"
mock "github.com/stretchr/testify/mock"
)
@@ -98,38 +99,47 @@ func (_c *Service_AddRule_Call) RunAndReturn(run func(ctx context.Context, sessi
return _c
}
// ConsumeAsync provides a mock function for the type Service
func (_mock *Service) ConsumeAsync(ctx context.Context, messages interface{}) {
_mock.Called(ctx, messages)
return
// Cancel provides a mock function for the type Service
func (_mock *Service) Cancel() error {
ret := _mock.Called()
if len(ret) == 0 {
panic("no return value specified for Cancel")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func() error); ok {
r0 = returnFunc()
} else {
r0 = ret.Error(0)
}
return r0
}
// Service_ConsumeAsync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ConsumeAsync'
type Service_ConsumeAsync_Call struct {
// Service_Cancel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Cancel'
type Service_Cancel_Call struct {
*mock.Call
}
// ConsumeAsync is a helper method to define mock.On call
// - ctx
// - messages
func (_e *Service_Expecter) ConsumeAsync(ctx interface{}, messages interface{}) *Service_ConsumeAsync_Call {
return &Service_ConsumeAsync_Call{Call: _e.mock.On("ConsumeAsync", ctx, messages)}
// Cancel is a helper method to define mock.On call
func (_e *Service_Expecter) Cancel() *Service_Cancel_Call {
return &Service_Cancel_Call{Call: _e.mock.On("Cancel")}
}
func (_c *Service_ConsumeAsync_Call) Run(run func(ctx context.Context, messages interface{})) *Service_ConsumeAsync_Call {
func (_c *Service_Cancel_Call) Run(run func()) *Service_Cancel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(interface{}))
run()
})
return _c
}
func (_c *Service_ConsumeAsync_Call) Return() *Service_ConsumeAsync_Call {
_c.Call.Return()
func (_c *Service_Cancel_Call) Return(err error) *Service_Cancel_Call {
_c.Call.Return(err)
return _c
}
func (_c *Service_ConsumeAsync_Call) RunAndReturn(run func(ctx context.Context, messages interface{})) *Service_ConsumeAsync_Call {
_c.Run(run)
func (_c *Service_Cancel_Call) RunAndReturn(run func() error) *Service_Cancel_Call {
_c.Call.Return(run)
return _c
}
@@ -291,6 +301,51 @@ func (_c *Service_Errors_Call) RunAndReturn(run func() <-chan error) *Service_Er
return _c
}
// Handle provides a mock function for the type Service
func (_mock *Service) Handle(msg *messaging.Message) error {
ret := _mock.Called(msg)
if len(ret) == 0 {
panic("no return value specified for Handle")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(*messaging.Message) error); ok {
r0 = returnFunc(msg)
} else {
r0 = ret.Error(0)
}
return r0
}
// Service_Handle_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Handle'
type Service_Handle_Call struct {
*mock.Call
}
// Handle is a helper method to define mock.On call
// - msg
func (_e *Service_Expecter) Handle(msg interface{}) *Service_Handle_Call {
return &Service_Handle_Call{Call: _e.mock.On("Handle", msg)}
}
func (_c *Service_Handle_Call) Run(run func(msg *messaging.Message)) *Service_Handle_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*messaging.Message))
})
return _c
}
func (_c *Service_Handle_Call) Return(err error) *Service_Handle_Call {
_c.Call.Return(err)
return _c
}
func (_c *Service_Handle_Call) RunAndReturn(run func(msg *messaging.Message) error) *Service_Handle_Call {
_c.Call.Return(run)
return _c
}
// ListRules provides a mock function for the type Service
func (_mock *Service) ListRules(ctx context.Context, session authn.Session, pm re.PageMeta) (re.Page, error) {
ret := _mock.Called(ctx, session, pm)
+37 -233
View File
@@ -5,31 +5,13 @@ package re
import (
"context"
"encoding/json"
"time"
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers"
mgjson "github.com/absmach/supermq/pkg/transformers/json"
"github.com/absmach/supermq/pkg/transformers/senml"
mgsenml "github.com/absmach/supermq/pkg/transformers/senml"
"github.com/vadv/gopher-lua-libs/argparse"
"github.com/vadv/gopher-lua-libs/base64"
"github.com/vadv/gopher-lua-libs/crypto"
"github.com/vadv/gopher-lua-libs/db"
"github.com/vadv/gopher-lua-libs/filepath"
"github.com/vadv/gopher-lua-libs/ioutil"
luajson "github.com/vadv/gopher-lua-libs/json"
"github.com/vadv/gopher-lua-libs/regexp"
"github.com/vadv/gopher-lua-libs/storage"
"github.com/vadv/gopher-lua-libs/strings"
luatime "github.com/vadv/gopher-lua-libs/time"
"github.com/vadv/gopher-lua-libs/yaml"
lua "github.com/yuin/gopher-lua"
)
@@ -101,7 +83,7 @@ type Page struct {
}
type Service interface {
consumers.AsyncConsumer
messaging.MessageHandler
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error)
UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
@@ -110,35 +92,30 @@ type Service interface {
EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
StartScheduler(ctx context.Context) error
Errors() <-chan error
}
type re struct {
writersPubSub messaging.PubSub
alarmsPubSub messaging.PubSub
rePubSub messaging.PubSub
idp supermq.IDProvider
repo Repository
errors chan error
ticker Ticker
email Emailer
ts []transformers.Transformer
writersPub messaging.Publisher
alarmsPub messaging.Publisher
rePubSub messaging.PubSub
idp supermq.IDProvider
repo Repository
errors chan error
ticker Ticker
email Emailer
}
func NewService(repo Repository, idp supermq.IDProvider, rePubSub messaging.PubSub, writersPubSub messaging.PubSub, alarmsPubSub messaging.PubSub, tck Ticker, emailer Emailer) Service {
func NewService(repo Repository, idp supermq.IDProvider, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, tck Ticker, emailer Emailer) Service {
return &re{
writersPubSub: writersPubSub,
alarmsPubSub: alarmsPubSub,
rePubSub: rePubSub,
repo: repo,
idp: idp,
errors: make(chan error),
ticker: tck,
email: emailer,
// Transformers order is important since SenML is also JSON content type.
ts: []transformers.Transformer{
mgsenml.New(mgsenml.JSON),
mgjson.New(nil),
},
writersPub: writersPub,
alarmsPub: alarmsPub,
rePubSub: rePubSub,
repo: repo,
idp: idp,
errors: make(chan error),
ticker: tck,
email: emailer,
}
}
@@ -227,132 +204,61 @@ func (re *re) DisableRule(ctx context.Context, session authn.Session, id string)
return rule, nil
}
func (re *re) ConsumeAsync(ctx context.Context, msgs interface{}) {
m, ok := msgs.(*messaging.Message)
if !ok {
return
}
inputChannel := m.Channel
for _, t := range re.ts {
if v, err := t.Transform(m); err == nil {
msgs = v
break
}
}
func (re *re) Handle(msg *messaging.Message) error {
inputChannel := msg.Channel
pm := PageMeta{
InputChannel: inputChannel,
Status: EnabledStatus,
}
ctx := context.Background()
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
re.errors <- err
return
return err
}
for _, r := range page.Rules {
go func(ctx context.Context) {
re.errors <- re.process(ctx, r, msgs)
if err := re.process(ctx, r, msg); err != nil {
re.errors <- err
}
}(ctx)
}
return nil
}
func (re *re) Cancel() error {
return nil
}
func (re *re) Errors() <-chan error {
return re.errors
}
func (re *re) process(ctx context.Context, r Rule, msg interface{}) error {
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error {
l := lua.NewState()
defer l.Close()
preload(l)
message := l.NewTable()
messages := l.NewTable()
switch m := msg.(type) {
case *messaging.Message:
{
l.RawSet(message, lua.LString("channel"), lua.LString(m.Channel))
l.RawSet(message, lua.LString("subtopic"), lua.LString(m.Subtopic))
l.RawSet(message, lua.LString("publisher"), lua.LString(m.Publisher))
l.RawSet(message, lua.LString("protocol"), lua.LString(m.Protocol))
l.RawSet(message, lua.LString("created"), lua.LNumber(m.Created))
pld := l.NewTable()
for i, b := range m.Payload {
l.RawSet(pld, lua.LNumber(i+1), lua.LNumber(b)) // Lua tables are 1-indexed
}
l.RawSet(message, lua.LString("payload"), pld)
}
case []mgsenml.Message:
for i, msg := range m {
insert := l.NewTable()
insert.RawSetString("channel", lua.LString(msg.Channel))
insert.RawSetString("subtopic", lua.LString(msg.Subtopic))
insert.RawSetString("publisher", lua.LString(msg.Publisher))
insert.RawSetString("protocol", lua.LString(msg.Protocol))
insert.RawSetString("name", lua.LString(msg.Name))
insert.RawSetString("unit", lua.LString(msg.Unit))
insert.RawSetString("time", lua.LNumber(msg.Time))
insert.RawSetString("update_time", lua.LNumber(msg.UpdateTime))
if msg.Value != nil {
insert.RawSetString("value", lua.LNumber(*msg.Value))
}
if msg.StringValue != nil {
insert.RawSetString("string_value", lua.LString(*msg.StringValue))
}
if msg.DataValue != nil {
insert.RawSetString("data_value", lua.LString(*msg.DataValue))
}
if msg.BoolValue != nil {
insert.RawSetString("bool_value", lua.LBool(*msg.BoolValue))
}
if msg.Sum != nil {
insert.RawSetString("sum", lua.LNumber(*msg.Sum))
}
messages.RawSetInt(i+1, insert) // Lua index starts at 1.
}
if len(m) == 1 {
message = messages.RawGetInt(1).(*lua.LTable)
}
case mgjson.Messages:
for i, msg := range m.Data {
insert := l.NewTable()
insert.RawSetString("channel", lua.LString(msg.Channel))
insert.RawSetString("subtopic", lua.LString(msg.Subtopic))
insert.RawSetString("publisher", lua.LString(msg.Publisher))
insert.RawSetString("protocol", lua.LString(msg.Protocol))
insert.RawSetString("format", lua.LString(m.Format))
traverseJson(l, insert, lua.LString("payload"), map[string]interface{}(msg.Payload))
messages.RawSetInt(i+1, insert) // Lua index starts at 1.
}
if len(m.Data) == 1 {
message = messages.RawGetInt(1).(*lua.LTable)
}
}
message := prepareMsg(l, msg)
// Set the message object as a Lua global variable.
l.SetGlobal("message", message)
l.SetGlobal("messages", messages)
l.SetGlobal("domain_id", lua.LString(r.DomainID))
// set the email function as a Lua global function
// set the email function as a Lua global function.
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
l.SetGlobal("save_senml", l.NewFunction(re.saveSenml))
l.SetGlobal("save_senml", l.NewFunction(re.save(ctx, msg)))
if err := l.DoString(string(r.Logic.Value)); err != nil {
return err
}
result := l.Get(-1) // Get the last result
result := l.Get(-1) // Get the last result.
switch result {
case lua.LNil:
return nil
default:
if len(r.OutputChannel) == 0 {
if r.OutputChannel == "" {
return nil
}
m := &messaging.Message{
@@ -448,105 +354,3 @@ func (r Rule) shouldRun(startTime time.Time) bool {
return false
}
func (re *re) sendEmail(L *lua.LState) int {
recipientsTable := L.ToTable(1)
subject := L.ToString(2)
content := L.ToString(3)
var recipients []string
recipientsTable.ForEach(func(_, value lua.LValue) {
if str, ok := value.(lua.LString); ok {
recipients = append(recipients, string(str))
}
})
if err := re.email.SendEmailNotification(recipients, "", subject, "", "", content, ""); err != nil {
return 0
}
return 1
}
func (re *re) saveSenml(L *lua.LState) int {
luaString := L.ToString(1)
var message senml.Message
if err := json.Unmarshal([]byte(luaString), &message); err != nil {
return 0
}
payload, err := json.Marshal(message)
if err != nil {
return 0
}
domainId := L.GetGlobal("domain_id").String()
ctx := context.Background()
m := &messaging.Message{
Publisher: message.Publisher,
Created: time.Now().Unix(),
Payload: payload,
Channel: message.Channel,
Subtopic: message.Subtopic,
Domain: domainId,
}
if err := re.writersPubSub.Publish(ctx, message.Channel, m); err != nil {
return 0
}
return 1
}
func preload(l *lua.LState) {
db.Preload(l)
ioutil.Preload(l)
luajson.Preload(l)
yaml.Preload(l)
crypto.Preload(l)
regexp.Preload(l)
luatime.Preload(l)
storage.Preload(l)
base64.Preload(l)
argparse.Preload(l)
strings.Preload(l)
filepath.Preload(l)
}
func traverseJson(l *lua.LState, parent *lua.LTable, key lua.LValue, value interface{}) {
var lval lua.LValue
switch val := value.(type) {
case string:
lval = lua.LString(val)
case float64:
lval = lua.LNumber(val)
case int:
lval = lua.LNumber(float64(val))
case json.Number:
if num, err := val.Float64(); err != nil {
lval = lua.LNumber(num)
}
case bool:
lval = lua.LBool(val)
case []interface{}:
t := l.NewTable()
for i, j := range val {
traverseJson(l, t, lua.LNumber(i+1), j)
}
lval = t
case map[string]interface{}:
t := l.NewTable()
for k, v := range val {
traverseJson(l, t, lua.LString(k), v)
}
lval = t
case []map[string]interface{}:
t := l.NewTable()
for i, j := range val {
traverseJson(l, t, lua.LNumber(i+1), j)
}
lval = t
}
parent.RawSet(key, lval)
return
}
+4 -26
View File
@@ -19,7 +19,6 @@ import (
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
pubsubmocks "github.com/absmach/supermq/pkg/messaging/mocks"
mgjson "github.com/absmach/supermq/pkg/transformers/json"
"github.com/absmach/supermq/pkg/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@@ -545,13 +544,13 @@ func TestDisableRule(t *testing.T) {
}
}
func TestConsumeAsync(t *testing.T) {
func TestHandle(t *testing.T) {
svc, repo, pubmocks, _ := newService(t)
now := time.Now()
cases := []struct {
desc string
message any
message *messaging.Message
pageMeta re.PageMeta
page re.Page
listErr error
@@ -600,32 +599,10 @@ func TestConsumeAsync(t *testing.T) {
},
listErr: nil,
},
{
desc: "consume message with unsupported message type",
message: "unsupported message type",
pageMeta: re.PageMeta{
InputChannel: inputChannel,
Status: re.EnabledStatus,
},
page: re.Page{},
},
{
desc: "consume json message",
message: mgjson.Message{},
pageMeta: re.PageMeta{
InputChannel: inputChannel,
Status: re.EnabledStatus,
},
page: re.Page{},
listErr: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
var err error
repoCall := repo.On("ListRules", mock.Anything, tc.pageMeta).Return(tc.page, tc.listErr).Run(func(args mock.Arguments) {
@@ -635,7 +612,8 @@ func TestConsumeAsync(t *testing.T) {
})
repoCall1 := pubmocks.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(tc.publishErr)
svc.ConsumeAsync(ctx, tc.message)
err = svc.Handle(tc.message)
assert.Nil(t, err)
assert.True(t, errors.Contains(err, tc.listErr), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.listErr, err))