MG-820 - RE outputs handling (#213)

* Fix: Update send alarm output handling

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update send email output

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update remote posgres output handling

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update keys for result and message to constants

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Remove unused param

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Remove test for query by output channel

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Separate outputs

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Revert docker compose changes

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Update mocks

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update license header

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update output type

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update handling outputs

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update test outputs

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: rename publisher to channel

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: update imports

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: Update unmarshal outputs

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* NOISSUE - Fix JSON marshalling and unmarshalling (#1)

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix: Update outputs

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: update the templating

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* Fix: update how we marshal alarms

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

---------

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
Co-authored-by: Dušan Borovčanin <borovcanindusan1@gmail.com>
This commit is contained in:
Ian Ngethe Muchiri
2025-06-26 18:35:46 +03:00
committed by GitHub
parent 88d2ef3257
commit 0d48e4ecdd
20 changed files with 586 additions and 334 deletions
-20
View File
@@ -497,26 +497,6 @@ func TestListRulesEndpoint(t *testing.T) {
status: http.StatusBadRequest,
err: apiutil.ErrInvalidQueryParams,
},
{
desc: "list rules with output channel",
domainID: domainID,
token: validToken,
listRulesResponse: re.Page{
Total: 1,
Rules: []re.Rule{rule},
},
query: "output_channel=output.channel",
status: http.StatusOK,
err: nil,
},
{
desc: "list rules with duplicate output channel",
domainID: domainID,
token: validToken,
query: "output_channel=1&output_channel=2",
status: http.StatusBadRequest,
err: apiutil.ErrInvalidQueryParams,
},
{
desc: "list rules with duplicate tags",
domainID: domainID,
+13 -19
View File
@@ -23,13 +23,12 @@ import (
)
const (
ruleIdKey = "ruleID"
reportIdKey = "reportID"
inputChannelKey = "input_channel"
outputChannelKey = "output_channel"
statusKey = "status"
actionKey = "action"
defAction = "view"
ruleIdKey = "ruleID"
reportIdKey = "reportID"
inputChannelKey = "input_channel"
statusKey = "status"
actionKey = "action"
defAction = "view"
)
// MakeHandler creates an HTTP handler for the service endpoints.
@@ -197,10 +196,6 @@ func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, er
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
oc, err := apiutil.ReadStringQuery(r, outputChannelKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
s, err := apiutil.ReadStringQuery(r, api.StatusKey, api.DefStatus)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
@@ -219,14 +214,13 @@ func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, er
}
return listRulesReq{
PageMeta: re.PageMeta{
Offset: offset,
Limit: limit,
Name: name,
InputChannel: ic,
OutputChannel: oc,
Status: st,
Dir: dir,
Tag: tag,
Offset: offset,
Limit: limit,
Name: name,
InputChannel: ic,
Status: st,
Dir: dir,
Tag: tag,
},
}, nil
}
+5 -136
View File
@@ -4,19 +4,17 @@
package re
import (
"bytes"
"context"
"encoding/gob"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/absmach/magistrala/alarms"
"github.com/absmach/senml"
"github.com/absmach/supermq/pkg/messaging"
lua "github.com/yuin/gopher-lua"
)
const (
MsgKey = "message"
LogicRespKey = "result"
)
func luaEncrypt(l *lua.LState) int {
key, iv, data, err := decodeParams(l)
if err != nil {
@@ -78,132 +76,3 @@ func decodeParams(l *lua.LState) (key, iv, data []byte, err error) {
return key, iv, data, nil
}
func (re *re) sendEmail(l *lua.LState) int {
recipientsTable := l.ToTable(1)
subject := l.ToString(2)
content := l.ToString(3)
var recipients []string
recipientsTable.ForEach(func(_, value lua.LValue) {
if str, ok := value.(lua.LString); ok {
recipients = append(recipients, string(str))
}
})
if err := re.email.SendEmailNotification(recipients, "", subject, "", "", content, "", make(map[string][]byte)); err != nil {
return 0
}
return 1
}
func (re *re) sendAlarm(ctx context.Context, ruleID string, original *messaging.Message) lua.LGFunction {
return func(l *lua.LState) int {
processAlarm := func(alarmTable *lua.LTable) int {
val := convertLua(alarmTable)
data, err := json.Marshal(val)
if err != nil {
return 0
}
alarm := alarms.Alarm{
RuleID: ruleID,
DomainID: original.Domain,
ClientID: original.Publisher,
ChannelID: original.Channel,
Subtopic: original.Subtopic,
}
if err := json.Unmarshal(data, &alarm); err != nil {
return 0
}
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(alarm); err != nil {
return 0
}
m := &messaging.Message{
Domain: original.Domain,
Publisher: original.Publisher,
Created: original.Created,
Channel: original.Channel,
Subtopic: original.Subtopic,
Protocol: original.Protocol,
Payload: buf.Bytes(),
}
topic := messaging.EncodeMessageTopic(original)
if err := re.alarmsPub.Publish(ctx, topic, m); err != nil {
return 0
}
return 1
}
table := l.ToTable(1)
if table.RawGetInt(1) != lua.LNil {
table.ForEach(func(_, value lua.LValue) {
if alarmTable, ok := value.(*lua.LTable); ok {
processAlarm(alarmTable)
}
})
} else {
processAlarm(table)
}
return 1
}
}
func (re *re) saveSenml(ctx context.Context, val interface{}, msg *messaging.Message) error {
// In case there is a single SenML value, convert to slice so we can decode.
if _, ok := val.([]any); !ok {
val = []any{val}
}
data, err := json.Marshal(val)
if err != nil {
return err
}
if _, err := senml.Decode(data, senml.JSON); err != nil {
return err
}
m := &messaging.Message{
Domain: msg.Domain,
Publisher: msg.Publisher,
Created: msg.Created,
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Protocol: msg.Protocol,
Payload: data,
}
topic := messaging.EncodeMessageTopic(msg)
if err := re.writersPub.Publish(ctx, topic, m); err != nil {
return err
}
return nil
}
func (re *re) publishChannel(ctx context.Context, val interface{}, channel, subtopic string, msg *messaging.Message) error {
data, err := json.Marshal(val)
if err != nil {
return err
}
m := &messaging.Message{
Domain: msg.Domain,
Publisher: msg.Publisher,
Created: msg.Created,
Channel: channel,
Subtopic: subtopic,
Protocol: protocol,
Payload: data,
}
topic := messaging.EncodeTopicSuffix(msg.Domain, channel, subtopic)
if err := re.rePubSub.Publish(ctx, topic, m); err != nil {
return err
}
return nil
}
+2 -1
View File
@@ -33,7 +33,8 @@ func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *m
if err != nil {
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
}
for _, o := range r.Logic.Outputs {
for _, o := range r.Outputs {
if res.Kind() == reflect.Bool && !res.Bool() {
return pkglog.RunInfo{Level: slog.LevelInfo, Message: "logic returned false", Details: details}
}
+21 -12
View File
@@ -12,6 +12,7 @@ import (
"time"
pkglog "github.com/absmach/magistrala/pkg/logger"
"github.com/absmach/magistrala/re/outputs"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
)
@@ -24,6 +25,7 @@ var (
const (
maxPayload = 100 * 1024
pldExceededFmt = "max payload size of 100kB exceeded: "
protocol = "nats"
)
func (re *re) Handle(msg *messaging.Message) error {
@@ -92,19 +94,26 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) pkglo
}
}
func (re *re) handleOutput(ctx context.Context, o ScriptOutput, r Rule, msg *messaging.Message, val interface{}) error {
switch o {
case Channels:
if r.OutputChannel == "" {
return nil
}
return re.publishChannel(ctx, val, r.OutputChannel, r.OutputTopic, msg)
case SaveSenML:
return re.saveSenml(ctx, val, msg)
case Email:
break
func (re *re) handleOutput(ctx context.Context, o Runnable, r Rule, msg *messaging.Message, val interface{}) error {
switch o := o.(type) {
case *outputs.Alarm:
o.AlarmsPub = re.alarmsPub
o.RuleID = r.ID
return o.Run(ctx, msg, val)
case *outputs.Email:
o.Emailer = re.email
return o.Run(ctx, msg, val)
case *outputs.Postgres:
return o.Run(ctx, msg, val)
case *outputs.ChannelPublisher:
o.RePubSub = re.rePubSub
return o.Run(ctx, msg, val)
case *outputs.SenML:
o.WritersPub = re.writersPub
return o.Run(ctx, msg, val)
default:
return fmt.Errorf("unknown output type: %T", o)
}
return nil
}
func (re *re) StartScheduler(ctx context.Context) error {
+4 -5
View File
@@ -41,8 +41,6 @@ func (re *re) processLua(ctx context.Context, details []slog.Attr, r Rule, msg *
l.SetGlobal("message", message)
// Set binding functions as a Lua global functions.
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
l.SetGlobal("send_alarm", l.NewFunction(re.sendAlarm(ctx, r.ID, msg)))
l.SetGlobal("aes_encrypt", l.NewFunction(luaEncrypt))
l.SetGlobal("aes_decrypt", l.NewFunction(luaDecrypt))
@@ -56,12 +54,13 @@ func (re *re) processLua(ctx context.Context, details []slog.Attr, r Rule, msg *
}
// Converting Lua is an expensive operation, so
// don't do it if there are no outputs.
if len(r.Logic.Outputs) == 0 {
return pkglog.RunInfo{Level: slog.LevelWarn, Message: "rule with no output channels", Details: details}
if len(r.Outputs) == 0 {
return pkglog.RunInfo{Level: slog.LevelWarn, Message: "rule with no outputs", Details: details}
}
var err error
res := convertLua(result)
for _, o := range r.Logic.Outputs {
for _, o := range r.Outputs {
// If value is false, don't run the follow-up.
if v, ok := res.(bool); ok && !v {
return pkglog.RunInfo{Level: slog.LevelInfo, Message: "logic returned false", Details: details}
+10 -10
View File
@@ -561,8 +561,8 @@ func (_c *Service_UpdateRuleSchedule_Call) RunAndReturn(run func(ctx context.Con
}
// UpdateRuleTags provides a mock function for the type Service
func (_mock *Service) UpdateRuleTags(ctx context.Context, session authn.Session, channel re.Rule) (re.Rule, error) {
ret := _mock.Called(ctx, session, channel)
func (_mock *Service) UpdateRuleTags(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
ret := _mock.Called(ctx, session, r)
if len(ret) == 0 {
panic("no return value specified for UpdateRuleTags")
@@ -571,15 +571,15 @@ func (_mock *Service) UpdateRuleTags(ctx context.Context, session authn.Session,
var r0 re.Rule
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok {
return returnFunc(ctx, session, channel)
return returnFunc(ctx, session, r)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok {
r0 = returnFunc(ctx, session, channel)
r0 = returnFunc(ctx, session, r)
} else {
r0 = ret.Get(0).(re.Rule)
}
if returnFunc, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok {
r1 = returnFunc(ctx, session, channel)
r1 = returnFunc(ctx, session, r)
} else {
r1 = ret.Error(1)
}
@@ -594,12 +594,12 @@ type Service_UpdateRuleTags_Call struct {
// UpdateRuleTags is a helper method to define mock.On call
// - ctx
// - session
// - channel
func (_e *Service_Expecter) UpdateRuleTags(ctx interface{}, session interface{}, channel interface{}) *Service_UpdateRuleTags_Call {
return &Service_UpdateRuleTags_Call{Call: _e.mock.On("UpdateRuleTags", ctx, session, channel)}
// - r
func (_e *Service_Expecter) UpdateRuleTags(ctx interface{}, session interface{}, r interface{}) *Service_UpdateRuleTags_Call {
return &Service_UpdateRuleTags_Call{Call: _e.mock.On("UpdateRuleTags", ctx, session, r)}
}
func (_c *Service_UpdateRuleTags_Call) Run(run func(ctx context.Context, session authn.Session, channel re.Rule)) *Service_UpdateRuleTags_Call {
func (_c *Service_UpdateRuleTags_Call) Run(run func(ctx context.Context, session authn.Session, r re.Rule)) *Service_UpdateRuleTags_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(authn.Session), args[2].(re.Rule))
})
@@ -611,7 +611,7 @@ func (_c *Service_UpdateRuleTags_Call) Return(rule re.Rule, err error) *Service_
return _c
}
func (_c *Service_UpdateRuleTags_Call) RunAndReturn(run func(ctx context.Context, session authn.Session, channel re.Rule) (re.Rule, error)) *Service_UpdateRuleTags_Call {
func (_c *Service_UpdateRuleTags_Call) RunAndReturn(run func(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error)) *Service_UpdateRuleTags_Call {
_c.Call.Return(run)
return _c
}
+78
View File
@@ -0,0 +1,78 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package outputs
import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"github.com/absmach/magistrala/alarms"
"github.com/absmach/supermq/pkg/messaging"
)
type Alarm struct {
AlarmsPub messaging.Publisher `json:"-"`
RuleID string `json:"rule_id"`
}
func (a *Alarm) Run(ctx context.Context, msg *messaging.Message, val interface{}) error {
data, err := json.Marshal(val)
if err != nil {
return err
}
var alarmsList []alarms.Alarm
if err := json.Unmarshal(data, &alarmsList); err != nil {
var single alarms.Alarm
if err := json.Unmarshal(data, &single); err != nil {
return err
}
alarmsList = []alarms.Alarm{single}
}
for _, alarm := range alarmsList {
if err := a.processAlarm(ctx, msg, alarm); err != nil {
return err
}
}
return nil
}
func (a *Alarm) processAlarm(ctx context.Context, msg *messaging.Message, alarm alarms.Alarm) error {
alarm.RuleID = a.RuleID
alarm.DomainID = msg.Domain
alarm.ClientID = msg.Publisher
alarm.ChannelID = msg.Channel
alarm.Subtopic = msg.Subtopic
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(alarm); err != nil {
return err
}
m := &messaging.Message{
Domain: msg.Domain,
Publisher: msg.Publisher,
Created: msg.Created,
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Protocol: msg.Protocol,
Payload: buf.Bytes(),
}
topic := messaging.EncodeMessageTopic(msg)
if err := a.AlarmsPub.Publish(ctx, topic, m); err != nil {
return err
}
return nil
}
func (a *Alarm) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"type": AlarmsType.String(),
})
}
+51
View File
@@ -0,0 +1,51 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package outputs
import (
"context"
"encoding/json"
"github.com/absmach/supermq/pkg/messaging"
)
const protocol = "nats"
type ChannelPublisher struct {
RePubSub messaging.PubSub `json:"-"`
Channel string `json:"channel"`
Topic string `json:"topic"`
}
func (p *ChannelPublisher) Run(ctx context.Context, msg *messaging.Message, val interface{}) error {
data, err := json.Marshal(val)
if err != nil {
return err
}
m := &messaging.Message{
Domain: msg.Domain,
Publisher: msg.Publisher,
Created: msg.Created,
Channel: p.Channel,
Subtopic: p.Topic,
Protocol: protocol,
Payload: data,
}
topic := messaging.EncodeTopicSuffix(msg.Domain, p.Channel, p.Topic)
if err := p.RePubSub.Publish(ctx, topic, m); err != nil {
return err
}
return nil
}
func (cp *ChannelPublisher) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{
"type": ChannelsType.String(),
"channel": cp.Channel,
"topic": cp.Topic,
})
}
+4
View File
@@ -0,0 +1,4 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package outputs
+54
View File
@@ -0,0 +1,54 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package outputs
import (
"bytes"
"context"
"encoding/json"
"text/template"
"github.com/absmach/magistrala/pkg/emailer"
"github.com/absmach/supermq/pkg/messaging"
)
type Email struct {
To []string `json:"to"`
Subject string `json:"subject"`
Content string `json:"content"`
Emailer emailer.Emailer `json:"-"`
}
func (e *Email) Run(ctx context.Context, msg *messaging.Message, val interface{}) error {
templData := templateVal{
Message: msg,
Result: val,
}
tmpl, err := template.New("email").Parse(e.Content)
if err != nil {
return err
}
var output bytes.Buffer
if err := tmpl.Execute(&output, templData); err != nil {
return err
}
content := output.String()
if err := e.Emailer.SendEmailNotification(e.To, "", e.Subject, "", "", content, "", make(map[string][]byte)); err != nil {
return err
}
return nil
}
func (e *Email) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"type": EmailType.String(),
"to": e.To,
"subject": e.Subject,
"content": e.Content,
})
}
+66
View File
@@ -0,0 +1,66 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package outputs
import (
"encoding/json"
"strings"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
)
type templateVal struct {
Message *messaging.Message
Result interface{}
}
// OutputType is the indicator for type of the output
// so we can move it to the Go instead calling Go from Lua.
type OutputType uint
const (
ChannelsType OutputType = iota
AlarmsType
SaveSenMLType
EmailType
SaveRemotePgType
)
var (
scriptKindToString = [...]string{"channels", "alarms", "save_senml", "email", "save_remote_pg"}
stringToScriptKind = map[string]OutputType{
"channels": ChannelsType,
"alarms": AlarmsType,
"save_senml": SaveSenMLType,
"email": EmailType,
"save_remote_pg": SaveRemotePgType,
}
)
func (s OutputType) String() string {
if int(s) < 0 || int(s) >= len(scriptKindToString) {
return "unknown"
}
return scriptKindToString[s]
}
// MarshalJSON converts OutputType to JSON.
func (s *OutputType) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
// UnmarshalJSON parses JSON string into OutputType.
func (s *OutputType) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
lower := strings.ToLower(str)
if val, ok := stringToScriptKind[lower]; ok {
*s = val
return nil
}
return errors.New("invalid OutputType: " + str)
}
+107
View File
@@ -0,0 +1,107 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package outputs
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"text/template"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
_ "github.com/jackc/pgx/v5/stdlib" // required for SQL access
"github.com/jmoiron/sqlx"
)
type Postgres struct {
Host string `json:"host"`
Port int `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
Table string `json:"table"`
Mapping string `json:"mapping"`
}
func (p *Postgres) Run(ctx context.Context, msg *messaging.Message, val interface{}) error {
templData := templateVal{
Message: msg,
Result: val,
}
tmpl, err := template.New("postgres").Parse(p.Mapping)
if err != nil {
return err
}
var output bytes.Buffer
if err := tmpl.Execute(&output, templData); err != nil {
return err
}
mapping := output.String()
var columns map[string]interface{}
if err = json.Unmarshal([]byte(mapping), &columns); err != nil {
return err
}
connStr := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
p.Host, p.Port, p.User, p.Password, p.Database,
)
db, err := sqlx.Open("pgx", connStr)
if err != nil {
return err
}
defer db.Close()
if err := db.Ping(); err != nil {
return errors.Wrap(errors.New("failed to connect to DB"), err)
}
var (
cols []string
values []interface{}
placeholders []string
)
i := 1
for k, v := range columns {
cols = append(cols, k)
values = append(values, v)
placeholders = append(placeholders, fmt.Sprintf("$%d", i))
i++
}
q := fmt.Sprintf(
`INSERT INTO %s (%s) VALUES (%s)`,
p.Table,
strings.Join(cols, ", "),
strings.Join(placeholders, ", "),
)
_, err = db.Exec(q, values...)
if err != nil {
return errors.Wrap(errors.New("failed to insert data"), err)
}
return nil
}
func (p *Postgres) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]any{
"type": SaveRemotePgType.String(),
"host": p.Host,
"port": p.Port,
"user": p.User,
"password": p.Password,
"database": p.Database,
"table": p.Table,
"mapping": p.Mapping,
})
}
+52
View File
@@ -0,0 +1,52 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package outputs
import (
"context"
"encoding/json"
"github.com/absmach/senml"
"github.com/absmach/supermq/pkg/messaging"
)
type SenML struct {
WritersPub messaging.Publisher `json:"-"`
}
func (s *SenML) Run(ctx context.Context, msg *messaging.Message, val interface{}) error {
// In case there is a single SenML value, convert to slice so we can decode.
if _, ok := val.([]any); !ok {
val = []any{val}
}
data, err := json.Marshal(val)
if err != nil {
return err
}
if _, err := senml.Decode(data, senml.JSON); err != nil {
return err
}
m := &messaging.Message{
Domain: msg.Domain,
Publisher: msg.Publisher,
Created: msg.Created,
Channel: msg.Channel,
Subtopic: msg.Subtopic,
Protocol: msg.Protocol,
Payload: data,
}
topic := messaging.EncodeMessageTopic(msg)
if err := s.WritersPub.Publish(ctx, topic, m); err != nil {
return err
}
return nil
}
func (senml *SenML) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{
"type": SaveSenMLType.String(),
})
}
+1 -3
View File
@@ -27,11 +27,9 @@ func Migration() *migrate.MemoryMigrationSource {
updated_by VARCHAR(254),
input_channel VARCHAR(36),
input_topic TEXT,
output_channel VARCHAR(36),
output_topic TEXT,
outputs JSONB,
status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0),
logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (logic_type >= 0),
logic_output SMALLINT[] NOT NULL DEFAULT '{}',
logic_value BYTEA,
time TIMESTAMP,
recurring SMALLINT,
+21 -31
View File
@@ -26,12 +26,12 @@ func NewRepository(db postgres.Database) re.Repository {
func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) {
q := `
INSERT INTO rules (id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status)
VALUES (:id, :name, :domain_id, :tags, :metadata, :input_channel, :input_topic, :logic_type, :logic_output, :logic_value,
:output_channel, :output_topic, :start_datetime, :time, :recurring, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status)
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
INSERT INTO rules (id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value,
outputs, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status)
VALUES (:id, :name, :domain_id, :tags, :metadata, :input_channel, :input_topic, :logic_type, :logic_value,
:outputs, :start_datetime, :time, :recurring, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status)
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value,
outputs, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
dbr, err := ruleToDb(r)
if err != nil {
@@ -60,8 +60,8 @@ func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule
func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rule, error) {
q := `
SELECT id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
SELECT id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value, outputs,
start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules
WHERE id = $1;
`
@@ -85,8 +85,8 @@ func (repo *PostgresRepository) UpdateRuleStatus(ctx context.Context, r re.Rule)
q := `UPDATE rules
SET status = :status, updated_at = :updated_at, updated_by = :updated_by
WHERE id = :id
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;`
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value,
outputs, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;`
return repo.update(ctx, r, q)
}
@@ -106,15 +106,11 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R
if r.InputTopic != "" {
query = append(query, "input_topic = :input_topic,")
}
if r.OutputChannel != "" {
query = append(query, "output_channel = :output_channel,")
}
if r.OutputTopic != "" {
query = append(query, "output_topic = :output_topic,")
if r.Outputs != nil {
query = append(query, "outputs = :outputs, ")
}
if r.Logic.Value != "" {
query = append(query, "logic_type = :logic_type,")
query = append(query, "logic_output = :logic_output,")
query = append(query, "logic_value = :logic_value,")
}
@@ -125,8 +121,8 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R
q := fmt.Sprintf(`
UPDATE rules
SET %s updated_at = :updated_at, updated_by = :updated_by WHERE id = :id
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value,
outputs, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`, upq)
return repo.update(ctx, r, q)
@@ -135,7 +131,7 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R
func (repo *PostgresRepository) UpdateRuleTags(ctx context.Context, r re.Rule) (re.Rule, error) {
q := `UPDATE rules SET tags = :tags, updated_at = :updated_at, updated_by = :updated_by
WHERE id = :id AND status = :status
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;`
r.Status = re.EnabledStatus
@@ -147,8 +143,8 @@ func (repo *PostgresRepository) UpdateRuleSchedule(ctx context.Context, r re.Rul
UPDATE rules
SET start_datetime = :start_datetime, time = :time, recurring = :recurring,
recurring_period = :recurring_period, updated_at = :updated_at, updated_by = :updated_by WHERE id = :id
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value,
outputs, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
return repo.update(ctx, r, q)
}
@@ -210,8 +206,8 @@ func (repo *PostgresRepository) ListRules(ctx context.Context, pm re.PageMeta) (
}
pq := pageRulesQuery(pm)
q := fmt.Sprintf(`
SELECT id, name, domain_id, tags, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
SELECT id, name, domain_id, tags, input_channel, input_topic, logic_type, logic_value, outputs,
start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules r %s %s;
`, pq, pgData)
rows, err := repo.DB.NamedQueryContext(ctx, q, pm)
@@ -253,8 +249,8 @@ func (repo *PostgresRepository) UpdateRuleDue(ctx context.Context, id string, du
q := `
UPDATE rules
SET time = :time, updated_at = :updated_at WHERE id = :id
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_value,
outputs, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
dbr := dbRule{
ID: id,
@@ -289,12 +285,6 @@ func pageRulesQuery(pm re.PageMeta) string {
if pm.InputChannel != "" {
query = append(query, "r.input_channel = :input_channel")
}
if pm.InputTopic != nil {
query = append(query, "r.input_topic = :input_topic")
}
if pm.OutputChannel != "" {
query = append(query, "r.output_channel = :output_channel")
}
if pm.Status != re.AllStatus {
query = append(query, "r.status = :status")
}
+21 -20
View File
@@ -12,7 +12,6 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/errors"
"github.com/jackc/pgtype"
"github.com/lib/pq"
)
// dbRule represents the database structure for a Rule.
@@ -25,10 +24,8 @@ type dbRule struct {
InputChannel string `db:"input_channel"`
InputTopic sql.NullString `db:"input_topic"`
LogicType re.ScriptType `db:"logic_type"`
LogicOutputs pq.Int32Array `db:"logic_output"`
LogicValue string `db:"logic_value"`
OutputChannel sql.NullString `db:"output_channel"`
OutputTopic sql.NullString `db:"output_topic"`
Outputs []byte `db:"outputs"`
StartDateTime sql.NullTime `db:"start_datetime"`
Time sql.NullTime `db:"time"`
Recurring schedule.Recurring `db:"recurring"`
@@ -49,10 +46,7 @@ func ruleToDb(r re.Rule) (dbRule, error) {
}
metadata = b
}
lo := pq.Int32Array{}
for _, v := range r.Logic.Outputs {
lo = append(lo, int32(v))
}
start := sql.NullTime{}
if r.Schedule.StartDateTime != nil && !r.Schedule.StartDateTime.IsZero() {
start.Time = *r.Schedule.StartDateTime
@@ -66,6 +60,12 @@ func ruleToDb(r re.Rule) (dbRule, error) {
if err := tags.Set(r.Tags); err != nil {
return dbRule{}, err
}
outputs, err := json.Marshal(r.Outputs)
if err != nil {
return dbRule{}, errors.Wrap(errors.ErrMalformedEntity, err)
}
return dbRule{
ID: r.ID,
Name: r.Name,
@@ -75,10 +75,8 @@ func ruleToDb(r re.Rule) (dbRule, error) {
InputChannel: r.InputChannel,
InputTopic: toNullString(r.InputTopic),
LogicType: r.Logic.Type,
LogicOutputs: lo,
LogicValue: r.Logic.Value,
OutputChannel: toNullString(r.OutputChannel),
OutputTopic: toNullString(r.OutputTopic),
Outputs: outputs,
StartDateTime: start,
Time: t,
Recurring: r.Schedule.Recurring,
@@ -98,14 +96,19 @@ func dbToRule(dto dbRule) (re.Rule, error) {
return re.Rule{}, errors.Wrap(errors.ErrMalformedEntity, err)
}
}
lo := []re.ScriptOutput{}
for _, v := range dto.LogicOutputs {
lo = append(lo, re.ScriptOutput(v))
}
var tags []string
for _, e := range dto.Tags.Elements {
tags = append(tags, e.String)
}
var outputs re.Outputs
if dto.Outputs != nil {
if err := json.Unmarshal(dto.Outputs, &outputs); err != nil {
return re.Rule{}, errors.Wrap(errors.ErrMalformedEntity, err)
}
}
return re.Rule{
ID: dto.ID,
Name: dto.Name,
@@ -115,12 +118,10 @@ func dbToRule(dto dbRule) (re.Rule, error) {
InputChannel: dto.InputChannel,
InputTopic: fromNullString(dto.InputTopic),
Logic: re.Script{
Outputs: lo,
Type: dto.LogicType,
Value: dto.LogicValue,
Type: dto.LogicType,
Value: dto.LogicValue,
},
OutputChannel: fromNullString(dto.OutputChannel),
OutputTopic: fromNullString(dto.OutputTopic),
Outputs: outputs,
Schedule: schedule.Schedule{
StartDateTime: &dto.StartDateTime.Time,
Time: dto.Time.Time,
+65 -73
View File
@@ -6,10 +6,10 @@ package re
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/absmach/magistrala/pkg/schedule"
"github.com/absmach/magistrala/re/outputs"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
@@ -20,88 +20,80 @@ const (
GoType
)
const protocol = "nats"
// ScriptOutput is the indicator for type of the logic
// so we can move it to the Go instead calling Go from Lua.
type ScriptOutput uint
const (
Channels ScriptOutput = iota
Alarms
SaveSenML
Email
SaveRemotePg
)
var (
scriptKindToString = [...]string{"channels", "alarms", "save_senml", "email", "save_remote_pg"}
stringToScriptKind = map[string]ScriptOutput{
"channels": Channels,
"alarms": Alarms,
"save_senml": SaveSenML,
"email": Email,
"save_remote_pg": SaveRemotePg,
}
)
func (s ScriptOutput) String() string {
if int(s) < 0 || int(s) >= len(scriptKindToString) {
return "unknown"
}
return scriptKindToString[s]
}
// MarshalJSON converts ScriptOutput to JSON.
func (s ScriptOutput) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
// UnmarshalJSON parses JSON string into ScriptOutput.
func (s *ScriptOutput) UnmarshalJSON(data []byte) error {
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
lower := strings.ToLower(str)
if val, ok := stringToScriptKind[lower]; ok {
*s = val
return nil
}
return errors.New("invalid ScriptOutput: " + str)
}
type (
// ScriptType indicates Runtime type for the future versions
// that will support JS or Go runtimes alongside Lua.
ScriptType uint
Metadata map[string]interface{}
Script struct {
Type ScriptType `json:"type"`
Outputs []ScriptOutput `json:"outputs"`
Value string `json:"value"`
Script struct {
Type ScriptType `json:"type"`
Value string `json:"value"`
}
)
var outputRegistry = map[outputs.OutputType]func() Runnable{
outputs.AlarmsType: func() Runnable { return &outputs.Alarm{} },
outputs.EmailType: func() Runnable { return &outputs.Email{} },
outputs.SaveRemotePgType: func() Runnable { return &outputs.Postgres{} },
outputs.ChannelsType: func() Runnable { return &outputs.ChannelPublisher{} },
outputs.SaveSenMLType: func() Runnable { return &outputs.SenML{} },
}
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
DomainID string `json:"domain"`
Metadata Metadata `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
InputChannel string `json:"input_channel"`
InputTopic string `json:"input_topic"`
Logic Script `json:"logic"`
OutputChannel string `json:"output_channel,omitempty"`
OutputTopic string `json:"output_topic,omitempty"`
Schedule schedule.Schedule `json:"schedule"`
Status Status `json:"status"`
CreatedAt time.Time `json:"created_at"`
CreatedBy string `json:"created_by"`
UpdatedAt time.Time `json:"updated_at"`
UpdatedBy string `json:"updated_by"`
ID string `json:"id"`
Name string `json:"name"`
DomainID string `json:"domain"`
Metadata Metadata `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
InputChannel string `json:"input_channel"`
InputTopic string `json:"input_topic"`
Logic Script `json:"logic"`
Outputs Outputs `json:"outputs,omitempty"`
Schedule schedule.Schedule `json:"schedule"`
Status Status `json:"status"`
CreatedAt time.Time `json:"created_at"`
CreatedBy string `json:"created_by"`
UpdatedAt time.Time `json:"updated_at"`
UpdatedBy string `json:"updated_by"`
}
type Outputs []Runnable
func (o *Outputs) UnmarshalJSON(data []byte) error {
var rawList []json.RawMessage
if err := json.Unmarshal(data, &rawList); err != nil {
return err
}
var runnables []Runnable
for _, raw := range rawList {
var meta struct {
Type outputs.OutputType `json:"type"`
}
if err := json.Unmarshal(raw, &meta); err != nil {
return err
}
factory, ok := outputRegistry[meta.Type]
if !ok {
return errors.New("unknown output type: " + meta.Type.String())
}
instance := factory()
if err := json.Unmarshal(raw, instance); err != nil {
return err
}
runnables = append(runnables, instance)
}
v := Outputs(runnables)
*o = v
return nil
}
type Runnable interface {
Run(ctx context.Context, msg *messaging.Message, val interface{}) error
}
// PageMeta contains page metadata that helps navigation.
+9 -2
View File
@@ -15,6 +15,7 @@ import (
pkgSch "github.com/absmach/magistrala/pkg/schedule"
"github.com/absmach/magistrala/re"
"github.com/absmach/magistrala/re/mocks"
"github.com/absmach/magistrala/re/outputs"
readmocks "github.com/absmach/magistrala/readers/mocks"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
@@ -595,6 +596,7 @@ func TestHandle(t *testing.T) {
svc, repo, pubmocks, _ := newService(t, make(chan pkglog.RunInfo))
now := time.Now()
scheduled := false
cases := []struct {
desc string
message *messaging.Message
@@ -630,8 +632,13 @@ func TestHandle(t *testing.T) {
Logic: re.Script{
Type: re.ScriptType(0),
},
OutputChannel: "output.channel",
Schedule: schedule,
Outputs: re.Outputs{
&outputs.ChannelPublisher{
Channel: "output.channel",
Topic: "output.topic",
},
},
Schedule: schedule,
},
},
},
+2 -2
View File
@@ -154,9 +154,9 @@ type ReportConfig struct {
Email *EmailSetting `json:"email,omitempty"`
Metrics []ReqMetric `json:"metrics,omitempty"`
Status Status `json:"status"`
CreatedAt time.Time `json:"created_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
CreatedBy string `json:"created_by,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
UpdatedBy string `json:"updated_by,omitempty"`
}