mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:20:18 +00:00
NOISSUE - Add Golang support to RE (#161)
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
@@ -33,6 +33,7 @@ require (
|
||||
github.com/spf13/cobra v1.9.1
|
||||
github.com/spf13/viper v1.20.1
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/traefik/yaegi v0.16.1
|
||||
github.com/vadv/gopher-lua-libs v0.6.0
|
||||
github.com/yuin/gopher-lua v1.1.1
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0
|
||||
|
||||
@@ -494,6 +494,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8
|
||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||
github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQDeX7m2XsSOlQEnM=
|
||||
github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog=
|
||||
github.com/traefik/yaegi v0.16.1 h1:f1De3DVJqIDKmnasUF6MwmWv1dSEEat0wcpXhD2On3E=
|
||||
github.com/traefik/yaegi v0.16.1/go.mod h1:4eVhbPb3LnD2VigQjhYbEJ69vDRFdT2HQNrXx8eEwUY=
|
||||
github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package re
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"reflect"
|
||||
|
||||
pkglog "github.com/absmach/magistrala/pkg/logger"
|
||||
"github.com/absmach/supermq/pkg/errors"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
golang "github.com/traefik/yaegi/interp"
|
||||
"github.com/traefik/yaegi/stdlib"
|
||||
)
|
||||
|
||||
func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *messaging.Message) pkglog.RunInfo {
|
||||
i := golang.New(golang.Options{})
|
||||
if err := i.Use(stdlib.Symbols); err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
|
||||
}
|
||||
err := i.Use(map[string]map[string]reflect.Value{
|
||||
"main": {
|
||||
"message": reflect.ValueOf(&msg).Elem(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
|
||||
}
|
||||
res, err := i.Eval(r.Logic.Value)
|
||||
if err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
|
||||
}
|
||||
for _, o := range r.Logic.Outputs {
|
||||
if res.Kind() == reflect.Bool && !res.Bool() {
|
||||
return pkglog.RunInfo{Level: slog.LevelInfo, Message: "logic returned false", Details: details}
|
||||
}
|
||||
if e := re.handleOutput(ctx, o, r, msg, res); e != nil {
|
||||
err = errors.Wrap(e, err)
|
||||
}
|
||||
}
|
||||
ret := pkglog.RunInfo{Level: slog.LevelInfo, Details: details, Message: "rule processed successfully"}
|
||||
if err != nil {
|
||||
ret.Level = slog.LevelError
|
||||
ret.Message = fmt.Sprintf("failed to handle rule output: %s", err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
+5
-44
@@ -14,7 +14,6 @@ import (
|
||||
pkglog "github.com/absmach/magistrala/pkg/logger"
|
||||
"github.com/absmach/supermq/pkg/errors"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
lua "github.com/yuin/gopher-lua"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -79,56 +78,18 @@ func matchSubject(published, subscribed string) bool {
|
||||
}
|
||||
|
||||
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) pkglog.RunInfo {
|
||||
l := lua.NewState()
|
||||
defer l.Close()
|
||||
preload(l)
|
||||
message := prepareMsg(l, msg)
|
||||
|
||||
// Set the message object as a Lua global variable.
|
||||
l.SetGlobal("message", message)
|
||||
|
||||
// Set binding functions as a Lua global functions.
|
||||
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
|
||||
l.SetGlobal("send_alarm", l.NewFunction(re.sendAlarm(ctx, r.ID, msg)))
|
||||
l.SetGlobal("aes_encrypt", l.NewFunction(luaEncrypt))
|
||||
l.SetGlobal("aes_decrypt", l.NewFunction(luaDecrypt))
|
||||
|
||||
details := []slog.Attr{
|
||||
slog.String("domain_id", r.DomainID),
|
||||
slog.String("rule_id", r.ID),
|
||||
slog.String("rule_name", r.Name),
|
||||
slog.Time("exec_time", time.Now().UTC()),
|
||||
}
|
||||
if err := l.DoString(r.Logic.Value); err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Message: fmt.Sprintf("failed to run rule logic: %s", err), Details: details}
|
||||
switch r.Logic.Type {
|
||||
case GoType:
|
||||
return re.processGo(ctx, details, r, msg)
|
||||
default:
|
||||
return re.processLua(ctx, details, r, msg)
|
||||
}
|
||||
// Get the last result.
|
||||
result := l.Get(-1)
|
||||
if result == lua.LNil {
|
||||
return pkglog.RunInfo{Level: slog.LevelWarn, Message: "rule with nil script result", Details: details}
|
||||
}
|
||||
// Converting Lua is an expensive operation, so
|
||||
// don't do it if there are no outputs.
|
||||
if len(r.Logic.Outputs) == 0 {
|
||||
return pkglog.RunInfo{Level: slog.LevelWarn, Message: "rule with no output channels", Details: details}
|
||||
}
|
||||
var err error
|
||||
res := convertLua(result)
|
||||
for _, o := range r.Logic.Outputs {
|
||||
// If value is false, don't run the follow-up.
|
||||
if v, ok := res.(bool); ok && !v {
|
||||
return pkglog.RunInfo{Level: slog.LevelInfo, Message: "logic returned false", Details: details}
|
||||
}
|
||||
if e := re.handleOutput(ctx, o, r, msg, res); e != nil {
|
||||
err = errors.Wrap(e, err)
|
||||
}
|
||||
}
|
||||
ret := pkglog.RunInfo{Level: slog.LevelInfo, Message: "rule processed successfully", Details: details}
|
||||
if err != nil {
|
||||
ret.Level = slog.LevelError
|
||||
ret.Message = fmt.Sprintf("failed to handle rule output: %s", err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (re *re) handleOutput(ctx context.Context, o ScriptOutput, r Rule, msg *messaging.Message, val interface{}) error {
|
||||
|
||||
@@ -4,8 +4,13 @@
|
||||
package re
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
pkglog "github.com/absmach/magistrala/pkg/logger"
|
||||
"github.com/absmach/supermq/pkg/errors"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
"github.com/vadv/gopher-lua-libs/argparse"
|
||||
"github.com/vadv/gopher-lua-libs/base64"
|
||||
@@ -26,6 +31,53 @@ import (
|
||||
|
||||
const payloadKey = "payload"
|
||||
|
||||
func (re *re) processLua(ctx context.Context, details []slog.Attr, r Rule, msg *messaging.Message) pkglog.RunInfo {
|
||||
l := lua.NewState()
|
||||
defer l.Close()
|
||||
preload(l)
|
||||
message := prepareMsg(l, msg)
|
||||
|
||||
// Set the message object as a Lua global variable.
|
||||
l.SetGlobal("message", message)
|
||||
|
||||
// Set binding functions as a Lua global functions.
|
||||
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
|
||||
l.SetGlobal("send_alarm", l.NewFunction(re.sendAlarm(ctx, r.ID, msg)))
|
||||
l.SetGlobal("aes_encrypt", l.NewFunction(luaEncrypt))
|
||||
l.SetGlobal("aes_decrypt", l.NewFunction(luaDecrypt))
|
||||
|
||||
if err := l.DoString(r.Logic.Value); err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Message: fmt.Sprintf("failed to run rule logic: %s", err), Details: details}
|
||||
}
|
||||
// Get the last result.
|
||||
result := l.Get(-1)
|
||||
if result == lua.LNil {
|
||||
return pkglog.RunInfo{Level: slog.LevelWarn, Message: "rule with nil script result", Details: details}
|
||||
}
|
||||
// Converting Lua is an expensive operation, so
|
||||
// don't do it if there are no outputs.
|
||||
if len(r.Logic.Outputs) == 0 {
|
||||
return pkglog.RunInfo{Level: slog.LevelWarn, Message: "rule with no output channels", Details: details}
|
||||
}
|
||||
var err error
|
||||
res := convertLua(result)
|
||||
for _, o := range r.Logic.Outputs {
|
||||
// If value is false, don't run the follow-up.
|
||||
if v, ok := res.(bool); ok && !v {
|
||||
return pkglog.RunInfo{Level: slog.LevelInfo, Message: "logic returned false", Details: details}
|
||||
}
|
||||
if e := re.handleOutput(ctx, o, r, msg, res); e != nil {
|
||||
err = errors.Wrap(e, err)
|
||||
}
|
||||
}
|
||||
ret := pkglog.RunInfo{Level: slog.LevelInfo, Message: "rule processed successfully", Details: details}
|
||||
if err != nil {
|
||||
ret.Level = slog.LevelError
|
||||
ret.Message = fmt.Sprintf("failed to handle rule output: %s", err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func preload(l *lua.LState) {
|
||||
db.Preload(l)
|
||||
ioutil.Preload(l)
|
||||
|
||||
@@ -12,6 +12,11 @@ import (
|
||||
"github.com/absmach/supermq/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
LuaType ScriptType = iota
|
||||
GoType
|
||||
)
|
||||
|
||||
const protocol = "nats"
|
||||
|
||||
// ScriptOutput is the indicator for type of the logic
|
||||
|
||||
Reference in New Issue
Block a user