MG-659 - Add message storage to Rules Engine (#112)

* add save senml function

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* add multiple pubsubs

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* remove ununsed constant

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* add domain id to message

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* use full words for prefixes

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* update prefixes

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

---------

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>
This commit is contained in:
Ian Ngethe Muchiri
2025-04-11 17:25:41 +03:00
committed by GitHub
parent faaf42941d
commit edf835b73a
5 changed files with 97 additions and 33 deletions
+34 -8
View File
@@ -31,6 +31,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
"github.com/absmach/supermq/pkg/messaging/nats"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
@@ -50,6 +51,11 @@ const (
defSvcHTTPPort = "9008"
)
var (
alarmsPrefix = "alarms"
writersPrefix = "writers"
)
type config struct {
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
@@ -124,13 +130,30 @@ func main() {
}
}()
tracer := tp.Tracer(svcName)
pubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
rePubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker for rePubSub: %s", err))
exitCode = 1
return
}
defer pubSub.Close()
defer rePubSub.Close()
writersPubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger, nats.Prefix(writersPrefix))
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker for writersPubSub: %s", err))
exitCode = 1
return
}
defer writersPubSub.Close()
alarmsPubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger, nats.Prefix(alarmsPrefix))
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker for alarmsPubSub: %s", err))
exitCode = 1
return
}
defer alarmsPubSub.Close()
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
@@ -138,7 +161,10 @@ func main() {
exitCode = 1
return
}
pubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, pubSub)
rePubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, rePubSub)
writersPubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, writersPubSub)
alarmsPubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, alarmsPubSub)
// Setup new redis cache client
// cacheclient, err := redisclient.Connect(cfg.CacheURL)
@@ -174,14 +200,14 @@ func main() {
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())
database := pgclient.NewDatabase(db, dbConfig, tracer)
svc, err := newService(ctx, database, pubSub, authz, cfg.ESURL, tracer, ec, logger)
svc, err := newService(ctx, database, rePubSub, writersPubSub, alarmsPubSub, authz, cfg.ESURL, tracer, ec, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
return
}
if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil {
if err = consumers.Start(ctx, svcName, rePubSub, svc, cfg.ConfigPath, logger); err != nil {
logger.Error(fmt.Sprintf("failed to create Rule Engine: %s", err))
exitCode = 1
return
@@ -214,7 +240,7 @@ func main() {
}
}
func newService(ctx context.Context, db pgclient.Database, pubsub messaging.PubSub, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) {
func newService(ctx context.Context, db pgclient.Database, rePubSub messaging.PubSub, writersPubSub messaging.PubSub, alarmsPubSub messaging.PubSub, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) {
repo := repg.NewRepository(db)
idp := uuid.New()
@@ -224,7 +250,7 @@ func newService(ctx context.Context, db pgclient.Database, pubsub messaging.PubS
}
// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
csvc := re.NewService(repo, idp, pubsub, re.NewTicker(time.Minute), emailerClient)
csvc := re.NewService(repo, idp, rePubSub, writersPubSub, alarmsPubSub, re.NewTicker(time.Minute), emailerClient)
csvc = middleware.LoggingMiddleware(csvc, logger)
return csvc, nil
+5 -5
View File
@@ -1,11 +1,11 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subscriber]
subjects = ["channels.>"]
# To listen all messsage broker subjects use default value "writers.>".
# To subscribe to specific subjects use values starting by "writers." and
# followed by a subtopic (e.g ["writers.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["writers.>"]
[transformer]
# SenML or JSON
+4 -4
View File
@@ -1,8 +1,8 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
# To listen all messsage broker subjects use default value "writers.>".
# To subscribe to specific subjects use values starting by "writers." and
# followed by a subtopic (e.g ["writers.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
filter = ["writers.>"]
+53 -15
View File
@@ -16,6 +16,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers"
mgjson "github.com/absmach/supermq/pkg/transformers/json"
"github.com/absmach/supermq/pkg/transformers/senml"
mgsenml "github.com/absmach/supermq/pkg/transformers/senml"
"github.com/vadv/gopher-lua-libs/argparse"
"github.com/vadv/gopher-lua-libs/base64"
@@ -112,23 +113,27 @@ type Service interface {
}
type re struct {
idp supermq.IDProvider
repo Repository
pubSub messaging.PubSub
errors chan error
ticker Ticker
email Emailer
ts []transformers.Transformer
writersPubSub messaging.PubSub
alarmsPubSub messaging.PubSub
rePubSub messaging.PubSub
idp supermq.IDProvider
repo Repository
errors chan error
ticker Ticker
email Emailer
ts []transformers.Transformer
}
func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub, tck Ticker, emailer Emailer) Service {
func NewService(repo Repository, idp supermq.IDProvider, rePubSub messaging.PubSub, writersPubSub messaging.PubSub, alarmsPubSub messaging.PubSub, tck Ticker, emailer Emailer) Service {
return &re{
repo: repo,
idp: idp,
pubSub: pubSub,
errors: make(chan error),
ticker: tck,
email: emailer,
writersPubSub: writersPubSub,
alarmsPubSub: alarmsPubSub,
rePubSub: rePubSub,
repo: repo,
idp: idp,
errors: make(chan error),
ticker: tck,
email: emailer,
// Transformers order is important since SenML is also JSON content type.
ts: []transformers.Transformer{
mgsenml.New(mgsenml.JSON),
@@ -332,9 +337,11 @@ func (re *re) process(ctx context.Context, r Rule, msg interface{}) error {
// Set the message object as a Lua global variable.
l.SetGlobal("message", message)
l.SetGlobal("messages", messages)
l.SetGlobal("domain_id", lua.LString(r.DomainID))
// set the email function as a Lua global function
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
l.SetGlobal("save_senml", l.NewFunction(re.saveSenml))
if err := l.DoString(string(r.Logic.Value)); err != nil {
return err
@@ -356,7 +363,7 @@ func (re *re) process(ctx context.Context, r Rule, msg interface{}) error {
Domain: r.DomainID,
Subtopic: r.OutputTopic,
}
return re.pubSub.Publish(ctx, m.Channel, m)
return re.rePubSub.Publish(ctx, m.Channel, m)
}
}
@@ -460,6 +467,37 @@ func (re *re) sendEmail(L *lua.LState) int {
return 1
}
func (re *re) saveSenml(L *lua.LState) int {
luaString := L.ToString(1)
var message senml.Message
if err := json.Unmarshal([]byte(luaString), &message); err != nil {
return 0
}
payload, err := json.Marshal(message)
if err != nil {
return 0
}
domainId := L.GetGlobal("domain_id").String()
ctx := context.Background()
m := &messaging.Message{
Publisher: message.Publisher,
Created: time.Now().Unix(),
Payload: payload,
Channel: message.Channel,
Subtopic: message.Subtopic,
Domain: domainId,
}
if err := re.writersPubSub.Publish(ctx, message.Channel, m); err != nil {
return 0
}
return 1
}
func preload(l *lua.LState) {
db.Preload(l)
ioutil.Preload(l)
+1 -1
View File
@@ -63,7 +63,7 @@ func newService(t *testing.T) (re.Service, *mocks.Repository, *pubsubmocks.PubSu
idProvider := uuid.NewMock()
pubsub := pubsubmocks.NewPubSub(t)
e := new(mocks.Emailer)
return re.NewService(repo, idProvider, pubsub, mockTicker, e), repo, pubsub, mockTicker
return re.NewService(repo, idProvider, pubsub, pubsub, pubsub, mockTicker, e), repo, pubsub, mockTicker
}
func TestAddRule(t *testing.T) {