mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:00:25 +00:00
NOISSUE - Fix Golang RE integration
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
+43
-8
@@ -5,6 +5,7 @@ package re
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"reflect"
|
||||
@@ -16,28 +17,62 @@ import (
|
||||
"github.com/traefik/yaegi/stdlib"
|
||||
)
|
||||
|
||||
const logicFunction = "main.logicFunction"
|
||||
|
||||
// Type message is an SMQ message with payload replaces by JSON deserialized payload.
|
||||
type message struct {
|
||||
Channel string `json:"channel,omitempty"`
|
||||
Domain string `json:"domain,omitempty"`
|
||||
Subtopic string `json:"subtopic,omitempty"`
|
||||
Publisher string `json:"publisher,omitempty"`
|
||||
Protocol string `json:"protocol,omitempty"`
|
||||
Created int64 `json:"created,omitempty"`
|
||||
Payload any `json:"payload,omitempty"`
|
||||
}
|
||||
|
||||
func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *messaging.Message) pkglog.RunInfo {
|
||||
i := golang.New(golang.Options{})
|
||||
if err := i.Use(stdlib.Symbols); err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
|
||||
}
|
||||
err := i.Use(map[string]map[string]reflect.Value{
|
||||
"main": {
|
||||
"message": reflect.ValueOf(&msg).Elem(),
|
||||
m := message{
|
||||
Created: msg.Created,
|
||||
Domain: msg.Domain,
|
||||
Publisher: msg.Publisher,
|
||||
Channel: msg.Channel,
|
||||
Subtopic: msg.Subtopic,
|
||||
Protocol: msg.Protocol,
|
||||
}
|
||||
var pld any
|
||||
if err := json.Unmarshal(msg.Payload, &pld); err != nil {
|
||||
pld = msg.Payload
|
||||
}
|
||||
m.Payload = pld
|
||||
|
||||
err := i.Use(golang.Exports{
|
||||
"messaging/m": {
|
||||
"message": reflect.ValueOf(m),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
|
||||
}
|
||||
res, err := i.Eval(r.Logic.Value)
|
||||
if _, err = i.Eval(r.Logic.Value); err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
|
||||
}
|
||||
ifc, err := i.Eval(logicFunction)
|
||||
if err != nil {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
|
||||
}
|
||||
|
||||
f, ok := ifc.Interface().(func() any)
|
||||
if !ok {
|
||||
return pkglog.RunInfo{Level: slog.LevelError, Message: "invalid logic function signature", Details: details}
|
||||
}
|
||||
res := f()
|
||||
if b, ok := res.(bool); ok && !b {
|
||||
return pkglog.RunInfo{Level: slog.LevelInfo, Message: "logic returned false", Details: details}
|
||||
}
|
||||
for _, o := range r.Outputs {
|
||||
if res.Kind() == reflect.Bool && !res.Bool() {
|
||||
return pkglog.RunInfo{Level: slog.LevelInfo, Message: "logic returned false", Details: details}
|
||||
}
|
||||
if e := re.handleOutput(ctx, o, r, msg, res); e != nil {
|
||||
err = errors.Wrap(e, err)
|
||||
}
|
||||
|
||||
+1
-1
@@ -94,7 +94,7 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) pkglo
|
||||
}
|
||||
}
|
||||
|
||||
func (re *re) handleOutput(ctx context.Context, o Runnable, r Rule, msg *messaging.Message, val interface{}) error {
|
||||
func (re *re) handleOutput(ctx context.Context, o Runnable, r Rule, msg *messaging.Message, val any) error {
|
||||
switch o := o.(type) {
|
||||
case *outputs.Alarm:
|
||||
o.AlarmsPub = re.alarmsPub
|
||||
|
||||
Reference in New Issue
Block a user