NOISSUE - Update Rules Engine (#88)

* update rules lua processing

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* remove logs

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* add emailer functionality

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* check error

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* remove files

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* fix tests

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* format template

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

---------

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>
This commit is contained in:
Ian Ngethe Muchiri
2025-04-02 19:09:11 +03:00
committed by GitHub
parent dd1df566c7
commit d1e855ccd4
10 changed files with 217 additions and 27 deletions
+17 -3
View File
@@ -14,8 +14,10 @@ import (
"time"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala/internal/email"
"github.com/absmach/magistrala/re"
httpapi "github.com/absmach/magistrala/re/api"
"github.com/absmach/magistrala/re/emailer"
"github.com/absmach/magistrala/re/middleware"
repg "github.com/absmach/magistrala/re/postgres"
"github.com/absmach/supermq"
@@ -88,6 +90,13 @@ func main() {
}
}
ec := email.Config{}
if err := env.Parse(&ec); err != nil {
logger.Error(fmt.Sprintf("failed to load email configuration : %s", err))
exitCode = 1
return
}
// Create new database for rule engine.
dbConfig := pgclient.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
@@ -164,7 +173,7 @@ func main() {
defer authzClient.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())
svc, err := newService(ctx, db, dbConfig, authz, cfg.ESURL, tracer, logger)
svc, err := newService(ctx, db, dbConfig, authz, cfg.ESURL, tracer, ec, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
@@ -204,13 +213,18 @@ func main() {
}
}
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, logger *slog.Logger) (re.Service, error) {
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) {
database := pgclient.NewDatabase(db, dbConfig, tracer)
repo := repg.NewRepository(database)
idp := uuid.New()
emailerClient, err := emailer.New(&ec)
if err != nil {
logger.Error(fmt.Sprintf("failed to configure e-mailing util: %s", err.Error()))
}
// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
csvc := re.NewService(repo, idp, nil, re.NewTicker(time.Minute))
csvc := re.NewService(repo, idp, nil, re.NewTicker(time.Minute), emailerClient)
csvc = middleware.LoggingMiddleware(csvc, logger)
return csvc, nil
+9
View File
@@ -109,6 +109,15 @@ MG_RE_DB_SSL_CERT=
MG_RE_DB_SSL_KEY=
MG_RE_DB_SSL_ROOT_CERT=
MG_RE_INSTANCE_ID=
MG_RE_EMAIL_TEMPLATE=re.tmpl
MG_EMAIL_HOST=smtp.mailtrap.io
MG_EMAIL_PORT=2525
MG_EMAIL_USERNAME=18bf7f70705139
MG_EMAIL_PASSWORD=2b0d302e775b1e
MG_EMAIL_FROM_ADDRESS=from@example.com
MG_EMAIL_FROM_NAME=Example
MG_EMAIL_TEMPLATE=email.tmpl
### Certs
SMQ_ADDONS_CERTS_PATH_PREFIX=./
+8
View File
@@ -184,11 +184,19 @@ services:
SMQ_SPICEDB_HOST: ${SMQ_SPICEDB_HOST}
SMQ_SPICEDB_PORT: ${SMQ_SPICEDB_PORT}
MG_RE_INSTANCE_ID: ${MG_RE_INSTANCE_ID}
MG_EMAIL_HOST: ${MG_EMAIL_HOST}
MG_EMAIL_PORT: ${MG_EMAIL_PORT}
MG_EMAIL_USERNAME: ${MG_EMAIL_USERNAME}
MG_EMAIL_PASSWORD: ${MG_EMAIL_PASSWORD}
MG_EMAIL_FROM_ADDRESS: ${MG_EMAIL_FROM_ADDRESS}
MG_EMAIL_FROM_NAME: ${MG_EMAIL_FROM_NAME}
MG_EMAIL_TEMPLATE: ${MG_EMAIL_TEMPLATE}
ports:
- ${MG_RE_HTTP_PORT}:${MG_RE_HTTP_PORT}
networks:
- magistrala-base-net
volumes:
- ./templates/${MG_RE_EMAIL_TEMPLATE}:/email.tmpl
# Auth gRPC client certificates
- type: bind
source: ${SMQ_AUTH_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
+4
View File
@@ -0,0 +1,4 @@
{{.Header}}
{{.Content}}
{{.Footer}}
+10
View File
@@ -0,0 +1,10 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re
//go:generate mockery --name Emailer --output=./mocks --filename emailer.go --quiet --note "Copyright (c) Abstract Machines"
type Emailer interface {
// SendEmailNotification sends an email to the recipients based on a trigger.
SendEmailNotification(to []string, from, subject, header, user, content, footer string) error
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package emailer contains the domain concept definitions needed to support
// Magistrala re email service functionality.
package emailer
+24
View File
@@ -0,0 +1,24 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package emailer
import (
"github.com/absmach/magistrala/internal/email"
"github.com/absmach/magistrala/re"
)
var _ re.Emailer = (*emailer)(nil)
type emailer struct {
agent *email.Agent
}
func New(a *email.Config) (re.Emailer, error) {
e, err := email.New(a)
return &emailer{agent: e}, err
}
func (e *emailer) SendEmailNotification(to []string, from, subject, header, user, content, footer string) error {
return e.agent.Send(to, from, subject, header, user, content, footer)
}
+44
View File
@@ -0,0 +1,44 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Abstract Machines
package mocks
import mock "github.com/stretchr/testify/mock"
// Emailer is an autogenerated mock type for the Emailer type
type Emailer struct {
mock.Mock
}
// SendEmailNotification provides a mock function with given fields: to, from, subject, header, user, content, footer
func (_m *Emailer) SendEmailNotification(to []string, from string, subject string, header string, user string, content string, footer string) error {
ret := _m.Called(to, from, subject, header, user, content, footer)
if len(ret) == 0 {
panic("no return value specified for SendEmailNotification")
}
var r0 error
if rf, ok := ret.Get(0).(func([]string, string, string, string, string, string, string) error); ok {
r0 = rf(to, from, subject, header, user, content, footer)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewEmailer creates a new instance of Emailer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewEmailer(t interface {
mock.TestingT
Cleanup(func())
}) *Emailer {
mock := &Emailer{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+93 -23
View File
@@ -14,6 +14,7 @@ import (
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
mgjson "github.com/absmach/supermq/pkg/transformers/json"
"github.com/absmach/supermq/pkg/transformers/senml"
lua "github.com/yuin/gopher-lua"
)
@@ -100,15 +101,17 @@ type re struct {
pubSub messaging.PubSub
errors chan error
ticker Ticker
email Emailer
}
func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub, tck Ticker) Service {
func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub, tck Ticker, emailer Emailer) Service {
return &re{
repo: repo,
idp: idp,
pubSub: pubSub,
errors: make(chan error),
ticker: tck,
email: emailer,
}
}
@@ -198,24 +201,37 @@ func (re *re) DisableRule(ctx context.Context, session authn.Session, id string)
}
func (re *re) ConsumeAsync(ctx context.Context, msgs interface{}) {
var inputChannel string
switch m := msgs.(type) {
case *messaging.Message:
pm := PageMeta{
InputChannel: m.Channel,
Status: EnabledStatus,
}
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
re.errors <- errors.Wrap(svcerr.ErrViewEntity, err)
inputChannel = m.Channel
case []senml.Message:
if len(m) == 0 {
return
}
for _, r := range page.Rules {
go func(ctx context.Context) {
re.errors <- re.process(ctx, r, m)
}(ctx)
}
message := m[0]
inputChannel = message.Channel
case mgjson.Message:
return
default:
return
}
pm := PageMeta{
InputChannel: inputChannel,
Status: EnabledStatus,
}
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
re.errors <- err
return
}
for _, r := range page.Rules {
go func(ctx context.Context) {
re.errors <- re.process(ctx, r, msgs)
}(ctx)
}
}
@@ -223,27 +239,62 @@ func (re *re) Errors() <-chan error {
return re.errors
}
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error {
func (re *re) process(ctx context.Context, r Rule, msg interface{}) error {
l := lua.NewState()
defer l.Close()
message := l.NewTable()
l.RawSet(message, lua.LString("channel"), lua.LString(msg.Channel))
l.RawSet(message, lua.LString("subtopic"), lua.LString(msg.Subtopic))
l.RawSet(message, lua.LString("publisher"), lua.LString(msg.Publisher))
l.RawSet(message, lua.LString("protocol"), lua.LString(msg.Protocol))
l.RawSet(message, lua.LString("created"), lua.LNumber(msg.Created))
switch m := msg.(type) {
case messaging.Message:
{
l.RawSet(message, lua.LString("channel"), lua.LString(m.Channel))
l.RawSet(message, lua.LString("subtopic"), lua.LString(m.Subtopic))
l.RawSet(message, lua.LString("publisher"), lua.LString(m.Publisher))
l.RawSet(message, lua.LString("protocol"), lua.LString(m.Protocol))
l.RawSet(message, lua.LString("created"), lua.LNumber(m.Created))
pld := l.NewTable()
for i, b := range msg.Payload {
l.RawSet(pld, lua.LNumber(i+1), lua.LNumber(b)) // Lua tables are 1-indexed
pld := l.NewTable()
for i, b := range m.Payload {
l.RawSet(pld, lua.LNumber(i+1), lua.LNumber(b)) // Lua tables are 1-indexed
}
l.RawSet(message, lua.LString("payload"), pld)
}
case []senml.Message:
msg := m[0]
l.RawSet(message, lua.LString("channel"), lua.LString(msg.Channel))
l.RawSet(message, lua.LString("subtopic"), lua.LString(msg.Subtopic))
l.RawSet(message, lua.LString("publisher"), lua.LString(msg.Publisher))
l.RawSet(message, lua.LString("protocol"), lua.LString(msg.Protocol))
l.RawSet(message, lua.LString("name"), lua.LString(msg.Name))
l.RawSet(message, lua.LString("unit"), lua.LString(msg.Unit))
l.RawSet(message, lua.LString("time"), lua.LNumber(msg.Time))
l.RawSet(message, lua.LString("update_time"), lua.LNumber(msg.UpdateTime))
if msg.Value != nil {
l.RawSet(message, lua.LString("value"), lua.LNumber(*msg.Value))
}
if msg.StringValue != nil {
l.RawSet(message, lua.LString("string_value"), lua.LString(*msg.StringValue))
}
if msg.DataValue != nil {
l.RawSet(message, lua.LString("data_value"), lua.LString(*msg.DataValue))
}
if msg.BoolValue != nil {
l.RawSet(message, lua.LString("bool_value"), lua.LBool(*msg.BoolValue))
}
if msg.Sum != nil {
l.RawSet(message, lua.LString("sum"), lua.LNumber(*msg.Sum))
}
}
l.RawSet(message, lua.LString("payload"), pld)
// Set the message object as a Lua global variable.
l.SetGlobal("message", message)
// set the email function as a Lua global function
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
if err := l.DoString(string(r.Logic.Value)); err != nil {
return err
}
@@ -260,6 +311,7 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error
Publisher: "magistrala.re",
Created: time.Now().Unix(),
Payload: []byte(result.String()),
Channel: r.OutputChannel,
}
return re.pubSub.Publish(ctx, m.Channel, m)
}
@@ -346,3 +398,21 @@ func (r Rule) shouldRun(startTime time.Time) bool {
return false
}
func (re *re) sendEmail(L *lua.LState) int {
recipientsTable := L.ToTable(1)
subject := L.ToString(2)
content := L.ToString(3)
var recipients []string
recipientsTable.ForEach(func(_, value lua.LValue) {
if str, ok := value.(lua.LString); ok {
recipients = append(recipients, string(str))
}
})
if err := re.email.SendEmailNotification(recipients, "", subject, "", "", content, ""); err != nil {
return 0
}
return 1
}
+2 -1
View File
@@ -62,7 +62,8 @@ func newService(t *testing.T) (re.Service, *mocks.Repository, *pubsubmocks.PubSu
mockTicker := new(mocks.Ticker)
idProvider := uuid.NewMock()
pubsub := pubsubmocks.NewPubSub(t)
return re.NewService(repo, idProvider, pubsub, mockTicker), repo, pubsub, mockTicker
e := new(mocks.Emailer)
return re.NewService(repo, idProvider, pubsub, mockTicker, e), repo, pubsub, mockTicker
}
func TestAddRule(t *testing.T) {