NOISSUE - Add script outputs (#121)

* Update RE to use pure Go instead of Lua bindings

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix RE DB

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix nil error case

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix adding query

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix constraints on kind and logic type

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update RE to use multiple outputs

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update PG writer output

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix protocol error in MQTT forwareder

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix rules error handings

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Add false value check

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix topic filtering

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix consumers

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix publisher

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix mocks

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix tests

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

---------

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
Dušan Borovčanin
2025-04-24 23:27:08 +02:00
committed by GitHub
parent 2a65b6b655
commit 4e9480266e
14 changed files with 421 additions and 333 deletions
+8 -5
View File
@@ -176,9 +176,10 @@ func main() {
}
defer authnClient.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
errs := make(chan error)
database := pgclient.NewDatabase(db, dbConfig, tracer)
svc, err := newService(database, msgSub, writersPub, alarmsPub, ec, logger)
svc, err := newService(database, errs, msgSub, writersPub, alarmsPub, ec, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
@@ -200,8 +201,10 @@ func main() {
go func() {
for {
err := <-svc.Errors()
logger.Warn("Error handling rule", slog.String("error", err.Error()))
err := <-errs
if err != nil {
logger.Warn("Error handling rule", slog.String("error", err.Error()))
}
}
}()
@@ -231,7 +234,7 @@ func main() {
}
}
func newService(db pgclient.Database, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, ec email.Config, logger *slog.Logger) (re.Service, error) {
func newService(db pgclient.Database, errs chan error, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, ec email.Config, logger *slog.Logger) (re.Service, error) {
repo := repg.NewRepository(db)
idp := uuid.New()
@@ -240,7 +243,7 @@ func newService(db pgclient.Database, rePubSub messaging.PubSub, writersPub, ala
logger.Error(fmt.Sprintf("failed to configure e-mailing util: %s", err.Error()))
}
csvc := re.NewService(repo, idp, rePubSub, writersPub, alarmsPub, re.NewTicker(time.Minute), emailerClient)
csvc := re.NewService(repo, errs, idp, rePubSub, writersPub, alarmsPub, re.NewTicker(time.Minute), emailerClient)
csvc = middleware.LoggingMiddleware(csvc, logger)
return csvc, nil
+2 -2
View File
@@ -16,7 +16,7 @@ func Migration() *migrate.MemoryMigrationSource {
time BIGINT NOT NULL,
channel UUID,
subtopic VARCHAR(254),
publisher UUID,
publisher VARCHAR(254),
protocol TEXT,
name VARCHAR(254),
unit TEXT,
@@ -26,7 +26,7 @@ func Migration() *migrate.MemoryMigrationSource {
data_value BYTEA,
sum FLOAT,
update_time FLOAT,
PRIMARY KEY (time, publisher, subtopic, name)
PRIMARY KEY (time, publisher, channel, subtopic, name)
);
SELECT create_hypertable('messages', 'time', create_default_indexes => FALSE, chunk_time_interval => 86400000, if_not_exists => TRUE);`,
},
+3 -3
View File
@@ -5,7 +5,7 @@ go 1.24.2
require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/supermq v0.16.1-0.20250415080928-62f45991404c
github.com/absmach/supermq v0.16.1-0.20250421184358-c14bb6e18a82
github.com/authzed/authzed-go v1.4.0
github.com/authzed/grpcutil v0.0.0-20250221190651-1985b19b35b8
github.com/caarlos0/env/v11 v11.3.1
@@ -85,7 +85,7 @@ require (
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jzelinskie/stringz v0.0.3 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/lib/pq v1.10.9
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
@@ -131,7 +131,7 @@ require (
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/net v0.37.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/text v0.24.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
+4 -4
View File
@@ -28,8 +28,8 @@ github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.7 h1:XLvpw0qxbP2QhOz7KLM2ZRar+vSCpSG/0o0kEvWx3No=
github.com/absmach/senml v1.0.7/go.mod h1:3bRIiNc8hq7l3auMs8gQrpsM5hHy7iDuiLILrf/+MfA=
github.com/absmach/supermq v0.16.1-0.20250415080928-62f45991404c h1:OjaoWbaRFJX42s1xyXxevuODnEyK7l2WryEfho8PWPI=
github.com/absmach/supermq v0.16.1-0.20250415080928-62f45991404c/go.mod h1:H71WVAlZK8ljDQWkzih6iQJ2ZT/IyZUILfAxLQBzLqo=
github.com/absmach/supermq v0.16.1-0.20250421184358-c14bb6e18a82 h1:X4zz8oLH6kGFugsuMwn8Zs9kZ1dZgDXopqPBEVSEK7I=
github.com/absmach/supermq v0.16.1-0.20250421184358-c14bb6e18a82/go.mod h1:1EhTeUYANhKiQN4rYaoXJW+sRz2rv0AdTwPMxlPc+Hw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -577,8 +577,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.29.0 h1:WdYw2tdTK1S8olAzWHdgeqfy+Mtm9XNhv/xJsY65d98=
+54 -39
View File
@@ -10,49 +10,15 @@ import (
"encoding/json"
"github.com/absmach/magistrala/alarms"
"github.com/absmach/senml"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers/senml"
lua "github.com/yuin/gopher-lua"
)
func (re *re) save(ctx context.Context, original *messaging.Message) lua.LGFunction {
return func(l *lua.LState) int {
table := l.ToTable(1)
val := convertLua(table)
// In case there is a single SenML value, convert to slice so we can unmarshal.
if _, ok := val.([]any); !ok {
val = []any{val}
}
data, err := json.Marshal(val)
if err != nil {
return 0
}
var message []senml.Message
if err := json.Unmarshal(data, &message); err != nil {
return 0
}
m := &messaging.Message{
Domain: original.Domain,
Publisher: original.Publisher,
Created: original.Created,
Channel: original.Channel,
Subtopic: original.Subtopic,
Protocol: original.Protocol,
Payload: data,
}
if err := re.writersPub.Publish(ctx, original.Channel, m); err != nil {
return 0
}
return 1
}
}
func (re *re) sendEmail(L *lua.LState) int {
recipientsTable := L.ToTable(1)
subject := L.ToString(2)
content := L.ToString(3)
func (re *re) sendEmail(l *lua.LState) int {
recipientsTable := l.ToTable(1)
subject := l.ToString(2)
content := l.ToString(3)
var recipients []string
recipientsTable.ForEach(func(_, value lua.LValue) {
@@ -121,3 +87,52 @@ func (re *re) sendAlarm(ctx context.Context, ruleID string, original *messaging.
return 1
}
}
func (re *re) saveSenml(ctx context.Context, val interface{}, msg *messaging.Message) error {
// In case there is a single SenML value, convert to slice so we can decode.
if _, ok := val.([]any); !ok {
val = []any{val}
}
data, err := json.Marshal(val)
if err != nil {
return err
}
if _, err := senml.Decode(data, senml.JSON); err != nil {
return err
}
m := &messaging.Message{
Domain: msg.Domain,
Publisher: msg.Publisher,
Created: msg.Created,
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Protocol: msg.Protocol,
Payload: data,
}
if err := re.writersPub.Publish(ctx, msg.Channel, m); err != nil {
return err
}
return nil
}
func (re *re) publishChannel(ctx context.Context, val interface{}, channel, subtopic string, msg *messaging.Message) error {
data, err := json.Marshal(val)
if err != nil {
return err
}
m := &messaging.Message{
Domain: msg.Domain,
Publisher: msg.Publisher,
Created: msg.Created,
Channel: channel,
Subtopic: subtopic,
Protocol: protocol,
Payload: data,
}
if err := re.rePubSub.Publish(ctx, channel, m); err != nil {
return err
}
return nil
}
+178
View File
@@ -0,0 +1,178 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re
import (
"context"
"strconv"
"time"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
lua "github.com/yuin/gopher-lua"
)
const (
maxPayload = 100 * 1024
pldExceededFmt = "max payload size of 100kB exceeded: "
)
func (re *re) Handle(msg *messaging.Message) error {
// Limit payload for RE so we don't get to process large JSON.
if n := len(msg.Payload); n > maxPayload {
return errors.New(pldExceededFmt + strconv.Itoa(n))
}
inputChannel := msg.Channel
pm := PageMeta{
InputChannel: inputChannel,
Status: EnabledStatus,
InputTopic: &msg.Subtopic,
}
ctx := context.Background()
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
return err
}
for _, r := range page.Rules {
go func(ctx context.Context) {
if err := re.process(ctx, r, msg); err != nil {
re.errors <- err
}
}(ctx)
}
return nil
}
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error {
l := lua.NewState()
defer l.Close()
preload(l)
message := prepareMsg(l, msg)
// Set the message object as a Lua global variable.
l.SetGlobal("message", message)
// set the email function as a Lua global function.
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
l.SetGlobal("send_alarm", l.NewFunction(re.sendAlarm(ctx, r.ID, msg)))
if err := l.DoString(r.Logic.Value); err != nil {
return err
}
// Get the last result.
result := l.Get(-1)
if result == lua.LNil {
return nil
}
var err error
for _, o := range r.Logic.Outputs {
val := convertLua(result)
// If value is false, don't run the follow-up.
if v, ok := val.(bool); ok && !v {
return nil
}
if e := re.handleOutput(ctx, o, r, msg, val); e != nil {
err = errors.Wrap(e, err)
}
}
return err
}
func (re *re) handleOutput(ctx context.Context, o ScriptOutput, r Rule, msg *messaging.Message, val interface{}) error {
switch o {
case Channels:
if r.OutputChannel == "" {
return nil
}
return re.publishChannel(ctx, val, r.OutputChannel, r.OutputTopic, msg)
case SaveSenML:
return re.saveSenml(ctx, val, msg)
case Email:
break
}
return nil
}
func (re *re) StartScheduler(ctx context.Context) error {
defer re.ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-re.ticker.Tick():
startTime := time.Now()
pm := PageMeta{
Status: EnabledStatus,
ScheduledBefore: &startTime,
}
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
return err
}
for _, rule := range page.Rules {
if rule.shouldRun(startTime) {
go func(r Rule) {
msg := &messaging.Message{
Channel: r.InputChannel,
Created: startTime.Unix(),
}
re.errors <- re.process(ctx, r, msg)
}(rule)
}
}
}
}
}
func (r Rule) shouldRun(startTime time.Time) bool {
// Don't run if the rule's start time is in the future
// This allows scheduling rules to start at a specific future time
if r.Schedule.StartDateTime.After(startTime) {
return false
}
t := r.Schedule.Time.Truncate(time.Minute).UTC()
startTimeOnly := time.Date(0, 1, 1, startTime.Hour(), startTime.Minute(), 0, 0, time.UTC)
if t.Equal(startTimeOnly) {
return true
}
if r.Schedule.RecurringPeriod == 0 {
return false
}
period := int(r.Schedule.RecurringPeriod)
switch r.Schedule.Recurring {
case Daily:
if r.Schedule.RecurringPeriod > 0 {
daysSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / hoursInDay
if int(daysSinceStart)%period == 0 {
return true
}
}
case Weekly:
if r.Schedule.RecurringPeriod > 0 {
weeksSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / (hoursInDay * daysInWeek)
if int(weeksSinceStart)%period == 0 {
return true
}
}
case Monthly:
if r.Schedule.RecurringPeriod > 0 {
monthsSinceStart := (startTime.Year()-r.Schedule.StartDateTime.Year())*monthsInYear +
int(startTime.Month()-r.Schedule.StartDateTime.Month())
if monthsSinceStart%period == 0 {
return true
}
}
}
return false
}
+15 -15
View File
@@ -195,25 +195,25 @@ func (lm *loggingMiddleware) StartScheduler(ctx context.Context) (err error) {
return lm.svc.StartScheduler(ctx)
}
func (lm *loggingMiddleware) Handle(msg *messaging.Message) error {
func (lm *loggingMiddleware) Handle(msg *messaging.Message) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
// Log only errors since we consume a lot of messages.
if err != nil {
args := []any{
slog.String("duration", time.Since(begin).String()),
}
if msg != nil {
args = append(args,
slog.String("channel", msg.Channel),
slog.String("payload_size", fmt.Sprintf("%d", len(msg.Payload))),
)
}
lm.logger.Warn("Message consumption completed", args...)
}
if msg != nil {
args = append(args,
slog.String("channel", msg.Channel),
slog.String("payload_size", fmt.Sprintf("%d", len(msg.Payload))),
)
}
lm.logger.Info("Message consumption completed", args...)
}(time.Now())
return lm.svc.Handle(msg)
}
func (lm *loggingMiddleware) Errors() <-chan error {
return lm.svc.Errors()
err = lm.svc.Handle(msg)
return
}
func (lm *loggingMiddleware) Cancel() error {
-46
View File
@@ -255,52 +255,6 @@ func (_c *Service_EnableRule_Call) RunAndReturn(run func(ctx context.Context, se
return _c
}
// Errors provides a mock function for the type Service
func (_mock *Service) Errors() <-chan error {
ret := _mock.Called()
if len(ret) == 0 {
panic("no return value specified for Errors")
}
var r0 <-chan error
if returnFunc, ok := ret.Get(0).(func() <-chan error); ok {
r0 = returnFunc()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan error)
}
}
return r0
}
// Service_Errors_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Errors'
type Service_Errors_Call struct {
*mock.Call
}
// Errors is a helper method to define mock.On call
func (_e *Service_Expecter) Errors() *Service_Errors_Call {
return &Service_Errors_Call{Call: _e.mock.On("Errors")}
}
func (_c *Service_Errors_Call) Run(run func()) *Service_Errors_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *Service_Errors_Call) Return(errCh <-chan error) *Service_Errors_Call {
_c.Call.Return(errCh)
return _c
}
func (_c *Service_Errors_Call) RunAndReturn(run func() <-chan error) *Service_Errors_Call {
_c.Call.Return(run)
return _c
}
// Handle provides a mock function for the type Service
func (_mock *Service) Handle(msg *messaging.Message) error {
ret := _mock.Called(msg)
+2 -1
View File
@@ -30,7 +30,8 @@ func Migration() *migrate.MemoryMigrationSource {
output_channel VARCHAR(36),
output_topic TEXT,
status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0),
logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0),
logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (logic_type >= 0),
logic_output SMALLINT[] NOT NULL DEFAULT '{}',
logic_value BYTEA,
time TIMESTAMP,
recurring SMALLINT,
+12 -10
View File
@@ -24,11 +24,11 @@ func NewRepository(db postgres.Database) re.Repository {
func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) {
q := `
INSERT INTO rules (id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
INSERT INTO rules (id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status)
VALUES (:id, :name, :domain_id, :metadata, :input_channel, :input_topic, :logic_type, :logic_value,
VALUES (:id, :name, :domain_id, :metadata, :input_channel, :input_topic, :logic_type, :logic_output, :logic_value,
:output_channel, :output_topic, :start_datetime, :time, :recurring, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status)
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
dbr, err := ruleToDb(r)
@@ -58,7 +58,7 @@ func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule
func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rule, error) {
q := `
SELECT id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, output_channel,
SELECT id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules
WHERE id = $1;
@@ -84,7 +84,7 @@ func (repo *PostgresRepository) UpdateRuleStatus(ctx context.Context, id string,
UPDATE rules
SET status = $2
WHERE id = $1
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
row := repo.DB.QueryRowxContext(ctx, q, id, status)
@@ -127,8 +127,9 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R
query = append(query, "output_topic = :output_topic,")
}
if r.Logic.Value != "" {
query = append(query, "logic_value = :logic_value,")
query = append(query, "logic_type = :logic_type,")
query = append(query, "logic_output = :logic_output,")
query = append(query, "logic_value = :logic_value,")
}
if len(query) > 0 {
@@ -138,7 +139,7 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R
q := fmt.Sprintf(`
UPDATE rules
SET %s updated_at = :updated_at, updated_by = :updated_by WHERE id = :id
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`, upq)
@@ -171,7 +172,7 @@ func (repo *PostgresRepository) UpdateRuleSchedule(ctx context.Context, r re.Rul
UPDATE rules
SET start_datetime = :start_datetime, time = :time, recurring = :recurring,
recurring_period = :recurring_period, updated_at = :updated_at, updated_by = :updated_by WHERE id = :id
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
dbr, err := ruleToDb(r)
@@ -230,10 +231,11 @@ func (repo *PostgresRepository) ListRules(ctx context.Context, pm re.PageMeta) (
}
pq := pageQuery(pm)
q := fmt.Sprintf(`
SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules r %s %s;
`, pq, pgData)
rows, err := repo.DB.NamedQueryContext(ctx, q, pm)
if err != nil {
return re.Page{}, err
@@ -274,7 +276,7 @@ func pageQuery(pm re.PageMeta) string {
if pm.InputChannel != "" {
query = append(query, "r.input_channel = :input_channel")
}
if pm.InputTopic != "" {
if pm.InputTopic != nil {
query = append(query, "r.input_topic = :input_topic")
}
if pm.OutputChannel != "" {
+14 -2
View File
@@ -10,6 +10,7 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/errors"
"github.com/lib/pq"
)
// dbRule represents the database structure for a Rule.
@@ -21,6 +22,7 @@ type dbRule struct {
InputChannel string `db:"input_channel"`
InputTopic sql.NullString `db:"input_topic"`
LogicType re.ScriptType `db:"logic_type"`
LogicOutputs pq.Int32Array `db:"logic_output"`
LogicValue string `db:"logic_value"`
OutputChannel sql.NullString `db:"output_channel"`
OutputTopic sql.NullString `db:"output_topic"`
@@ -44,6 +46,10 @@ func ruleToDb(r re.Rule) (dbRule, error) {
}
metadata = b
}
lo := pq.Int32Array{}
for _, v := range r.Logic.Outputs {
lo = append(lo, int32(v))
}
return dbRule{
ID: r.ID,
Name: r.Name,
@@ -52,6 +58,7 @@ func ruleToDb(r re.Rule) (dbRule, error) {
InputChannel: r.InputChannel,
InputTopic: toNullString(r.InputTopic),
LogicType: r.Logic.Type,
LogicOutputs: lo,
LogicValue: r.Logic.Value,
OutputChannel: toNullString(r.OutputChannel),
OutputTopic: toNullString(r.OutputTopic),
@@ -74,6 +81,10 @@ func dbToRule(dto dbRule) (re.Rule, error) {
return re.Rule{}, errors.Wrap(errors.ErrMalformedEntity, err)
}
}
lo := []re.ScriptOutput{}
for _, v := range dto.LogicOutputs {
lo = append(lo, re.ScriptOutput(v))
}
return re.Rule{
ID: dto.ID,
Name: dto.Name,
@@ -82,8 +93,9 @@ func dbToRule(dto dbRule) (re.Rule, error) {
InputChannel: dto.InputChannel,
InputTopic: fromNullString(dto.InputTopic),
Logic: re.Script{
Type: dto.LogicType,
Value: dto.LogicValue,
Outputs: lo,
Type: dto.LogicType,
Value: dto.LogicValue,
},
OutputChannel: fromNullString(dto.OutputChannel),
OutputTopic: fromNullString(dto.OutputTopic),
+103
View File
@@ -0,0 +1,103 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re
import (
"encoding/json"
"strings"
"time"
"github.com/absmach/supermq/pkg/errors"
)
var ErrInvalidRecurringType = errors.New("invalid recurring type")
const (
hoursInDay = 24
daysInWeek = 7
monthsInYear = 12
protocol = "nats"
)
// ScriptOutput is the indicator for type of the logic
// so we can move it to the Go instead calling Go from Lua.
type ScriptOutput uint
const (
Channels ScriptOutput = iota
Alarms
SaveSenML
Email
SaveRemotePg
)
var (
scriptKindToString = [...]string{"channels", "alarms", "save_senml", "email", "save_remote_pg"}
stringToScriptKind = map[string]ScriptOutput{
"channels": Channels,
"alarms": Alarms,
"save_senml": SaveSenML,
"email": Email,
"save_remote_pg": SaveRemotePg,
}
)
func (s ScriptOutput) String() string {
if int(s) < 0 || int(s) >= len(scriptKindToString) {
return "unknown"
}
return scriptKindToString[s]
}
// MarshalJSON converts ScriptOutput to JSON.
func (s ScriptOutput) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
// UnmarshalJSON parses JSON string into ScriptOutput.
func (s *ScriptOutput) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
lower := strings.ToLower(str)
if val, ok := stringToScriptKind[lower]; ok {
*s = val
return nil
}
return errors.New("invalid ScriptOutput: " + str)
}
type (
// ScriptType indicates Runtime type for the future versions
// that will support JS or Go runtimes alongside Lua.
ScriptType uint
Metadata map[string]interface{}
Script struct {
Type ScriptType `json:"type"`
Outputs []ScriptOutput `json:"outputs"`
Value string `json:"value"`
}
)
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
DomainID string `json:"domain"`
Metadata Metadata `json:"metadata,omitempty"`
InputChannel string `json:"input_channel"`
InputTopic string `json:"input_topic"`
Logic Script `json:"logic"`
OutputChannel string `json:"output_channel,omitempty"`
OutputTopic string `json:"output_topic,omitempty"`
Schedule Schedule `json:"schedule,omitempty"`
Status Status `json:"status"`
CreatedAt time.Time `json:"created_at,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
}
+10 -194
View File
@@ -12,46 +12,8 @@ import (
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
lua "github.com/yuin/gopher-lua"
)
const (
hoursInDay = 24
daysInWeek = 7
monthsInYear = 12
publisher = "magistrala.re"
)
var ErrInvalidRecurringType = errors.New("invalid recurring type")
type (
ScriptType uint
Metadata map[string]interface{}
Script struct {
Type ScriptType `json:"type"`
Value string `json:"value"`
}
)
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
DomainID string `json:"domain"`
Metadata Metadata `json:"metadata,omitempty"`
InputChannel string `json:"input_channel"`
InputTopic string `json:"input_topic"`
Logic Script `json:"logic"`
OutputChannel string `json:"output_channel,omitempty"`
OutputTopic string `json:"output_topic,omitempty"`
Schedule Schedule `json:"schedule,omitempty"`
Status Status `json:"status"`
CreatedAt time.Time `json:"created_at,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
}
type Repository interface {
AddRule(ctx context.Context, r Rule) (Rule, error)
ViewRule(ctx context.Context, id string) (Rule, error)
@@ -70,7 +32,7 @@ type PageMeta struct {
Dir string `json:"dir" db:"dir"`
Name string `json:"name" db:"name"`
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
InputTopic string `json:"input_topic,omitempty" db:"input_topic"`
InputTopic *string `json:"input_topic,omitempty" db:"input_topic"`
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
Status Status `json:"status,omitempty" db:"status"`
Domain string `json:"domain_id,omitempty" db:"domain_id"`
@@ -97,28 +59,27 @@ type Service interface {
EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
StartScheduler(ctx context.Context) error
Errors() <-chan error
}
type re struct {
writersPub messaging.Publisher
alarmsPub messaging.Publisher
rePubSub messaging.PubSub
idp supermq.IDProvider
repo Repository
errors chan error
idp supermq.IDProvider
rePubSub messaging.PubSub
writersPub messaging.Publisher
alarmsPub messaging.Publisher
ticker Ticker
email Emailer
}
func NewService(repo Repository, idp supermq.IDProvider, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, tck Ticker, emailer Emailer) Service {
func NewService(repo Repository, errors chan (error), idp supermq.IDProvider, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, tck Ticker, emailer Emailer) Service {
return &re{
writersPub: writersPub,
alarmsPub: alarmsPub,
rePubSub: rePubSub,
repo: repo,
idp: idp,
errors: make(chan error),
errors: errors,
rePubSub: rePubSub,
writersPub: writersPub,
alarmsPub: alarmsPub,
ticker: tck,
email: emailer,
}
@@ -220,30 +181,6 @@ func (re *re) DisableRule(ctx context.Context, session authn.Session, id string)
return rule, nil
}
func (re *re) Handle(msg *messaging.Message) error {
inputChannel := msg.Channel
pm := PageMeta{
InputChannel: inputChannel,
Status: EnabledStatus,
InputTopic: msg.Subtopic,
}
ctx := context.Background()
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
return err
}
for _, r := range page.Rules {
go func(ctx context.Context) {
if err := re.process(ctx, r, msg); err != nil {
re.errors <- err
}
}(ctx)
}
return nil
}
func (re *re) Cancel() error {
return nil
}
@@ -251,124 +188,3 @@ func (re *re) Cancel() error {
func (re *re) Errors() <-chan error {
return re.errors
}
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error {
l := lua.NewState()
defer l.Close()
preload(l)
message := prepareMsg(l, msg)
// Set the message object as a Lua global variable.
l.SetGlobal("message", message)
// set the email function as a Lua global function.
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
l.SetGlobal("save_senml", l.NewFunction(re.save(ctx, msg)))
l.SetGlobal("send_alarm", l.NewFunction(re.sendAlarm(ctx, r.ID, msg)))
if err := l.DoString(string(r.Logic.Value)); err != nil {
return err
}
result := l.Get(-1) // Get the last result.
switch result {
case lua.LNil:
return nil
default:
if r.OutputChannel == "" {
return nil
}
m := &messaging.Message{
Publisher: publisher,
Created: time.Now().Unix(),
Payload: []byte(result.String()),
Channel: r.OutputChannel,
Domain: r.DomainID,
Subtopic: r.OutputTopic,
}
return re.rePubSub.Publish(ctx, m.Channel, m)
}
}
func (re *re) StartScheduler(ctx context.Context) error {
defer re.ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-re.ticker.Tick():
startTime := time.Now()
pm := PageMeta{
Status: EnabledStatus,
ScheduledBefore: &startTime,
}
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
return err
}
for _, rule := range page.Rules {
if rule.shouldRun(startTime) {
go func(r Rule) {
msg := &messaging.Message{
Channel: r.InputChannel,
Created: startTime.Unix(),
}
re.errors <- re.process(ctx, r, msg)
}(rule)
}
}
}
}
}
func (r Rule) shouldRun(startTime time.Time) bool {
// Don't run if the rule's start time is in the future
// This allows scheduling rules to start at a specific future time
if r.Schedule.StartDateTime.After(startTime) {
return false
}
t := r.Schedule.Time.Truncate(time.Minute).UTC()
startTimeOnly := time.Date(0, 1, 1, startTime.Hour(), startTime.Minute(), 0, 0, time.UTC)
if t.Equal(startTimeOnly) {
return true
}
if r.Schedule.RecurringPeriod == 0 {
return false
}
period := int(r.Schedule.RecurringPeriod)
switch r.Schedule.Recurring {
case Daily:
if r.Schedule.RecurringPeriod > 0 {
daysSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / hoursInDay
if int(daysSinceStart)%period == 0 {
return true
}
}
case Weekly:
if r.Schedule.RecurringPeriod > 0 {
weeksSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / (hoursInDay * daysInWeek)
if int(weeksSinceStart)%period == 0 {
return true
}
}
case Monthly:
if r.Schedule.RecurringPeriod > 0 {
monthsSinceStart := (startTime.Year()-r.Schedule.StartDateTime.Year())*monthsInYear +
int(startTime.Month()-r.Schedule.StartDateTime.Month())
if monthsSinceStart%period == 0 {
return true
}
}
}
return false
}
+16 -12
View File
@@ -56,17 +56,17 @@ var (
}
)
func newService(t *testing.T) (re.Service, *mocks.Repository, *pubsubmocks.PubSub, *mocks.Ticker) {
func newService(t *testing.T, errs chan error) (re.Service, *mocks.Repository, *pubsubmocks.PubSub, *mocks.Ticker) {
repo := new(mocks.Repository)
mockTicker := new(mocks.Ticker)
idProvider := uuid.NewMock()
pubsub := pubsubmocks.NewPubSub(t)
e := new(mocks.Emailer)
return re.NewService(repo, idProvider, pubsub, pubsub, pubsub, mockTicker, e), repo, pubsub, mockTicker
return re.NewService(repo, errs, idProvider, pubsub, pubsub, pubsub, mockTicker, e), repo, pubsub, mockTicker
}
func TestAddRule(t *testing.T) {
svc, repo, _, _ := newService(t)
svc, repo, _, _ := newService(t, make(chan error))
ruleName := namegen.Generate()
now := time.Now().Add(time.Hour)
cases := []struct {
@@ -141,7 +141,7 @@ func TestAddRule(t *testing.T) {
}
func TestViewRule(t *testing.T) {
svc, repo, _, _ := newService(t)
svc, repo, _, _ := newService(t, make(chan error))
now := time.Now().Add(time.Hour)
cases := []struct {
@@ -199,7 +199,7 @@ func TestViewRule(t *testing.T) {
}
func TestUpdateRule(t *testing.T) {
svc, repo, _, _ := newService(t)
svc, repo, _, _ := newService(t, make(chan error))
newName := namegen.Generate()
now := time.Now().Add(time.Hour)
@@ -284,7 +284,7 @@ func TestUpdateRule(t *testing.T) {
}
func TestListRules(t *testing.T) {
svc, repo, _, _ := newService(t)
svc, repo, _, _ := newService(t, make(chan error))
numRules := 50
now := time.Now().Add(time.Hour)
var rules []re.Rule
@@ -389,7 +389,7 @@ func TestListRules(t *testing.T) {
}
func TestRemoveRule(t *testing.T) {
svc, repo, _, _ := newService(t)
svc, repo, _, _ := newService(t, make(chan error))
cases := []struct {
desc string
@@ -429,7 +429,7 @@ func TestRemoveRule(t *testing.T) {
}
func TestEnableRule(t *testing.T) {
svc, repo, _, _ := newService(t)
svc, repo, _, _ := newService(t, make(chan error))
cases := []struct {
desc string
@@ -484,7 +484,7 @@ func TestEnableRule(t *testing.T) {
}
func TestDisableRule(t *testing.T) {
svc, repo, _, _ := newService(t)
svc, repo, _, _ := newService(t, make(chan error))
cases := []struct {
desc string
@@ -539,8 +539,9 @@ func TestDisableRule(t *testing.T) {
}
func TestHandle(t *testing.T) {
svc, repo, pubmocks, _ := newService(t)
svc, repo, pubmocks, _ := newService(t, make(chan error))
now := time.Now()
empty := ""
cases := []struct {
desc string
@@ -559,6 +560,7 @@ func TestHandle(t *testing.T) {
},
pageMeta: re.PageMeta{
InputChannel: inputChannel,
InputTopic: &empty,
Status: re.EnabledStatus,
},
page: re.Page{
@@ -575,6 +577,7 @@ func TestHandle(t *testing.T) {
pageMeta: re.PageMeta{
InputChannel: inputChannel,
Status: re.EnabledStatus,
InputTopic: &empty,
},
page: re.Page{
Rules: []re.Rule{
@@ -619,7 +622,8 @@ func TestHandle(t *testing.T) {
func TestStartScheduler(t *testing.T) {
now := time.Now().Truncate(time.Minute)
svc, repo, _, ticker := newService(t)
errs := make(chan error)
svc, repo, _, ticker := newService(t, errs)
noRecurringPeriod := re.Rule{
ID: testsutil.GenerateUUID(t),
@@ -834,7 +838,7 @@ func TestStartScheduler(t *testing.T) {
case "start scheduler with list error":
tickChan <- time.Now()
time.Sleep(100 * time.Millisecond)
if err := svc.Errors(); err != nil {
if err := errs; err != nil {
cancel()
}
default: