MG-406 - Fix panic issues in rules engine go script (#407)

* initial implementation

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* revert variable

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

---------

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
This commit is contained in:
Steve Munene
2026-03-03 17:34:27 +03:00
committed by GitHub
parent 0a45a96fac
commit de6f3921a4
3 changed files with 101 additions and 2 deletions
+15 -2
View File
@@ -9,6 +9,7 @@ import (
"fmt"
"log/slog"
"reflect"
"regexp"
pkglog "github.com/absmach/magistrala/pkg/logger"
"github.com/absmach/supermq/pkg/errors"
@@ -19,6 +20,8 @@ import (
const logicFunction = "main.logicFunction"
var goKeywordRegex = regexp.MustCompile(`\bgo\s+func\s*\(|^\s*go\s+\w+\(|[;\s{]go\s+func\s*\(|[;\s{]go\s+\w+\(`)
// Type message is an SMQ message with payload replaces by JSON deserialized payload.
type message struct {
Channel string `json:"channel,omitempty"`
@@ -30,7 +33,17 @@ type message struct {
Payload any `json:"payload,omitempty"`
}
func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *messaging.Message) pkglog.RunInfo {
func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *messaging.Message) (ret pkglog.RunInfo) {
defer func() {
if r := recover(); r != nil {
ret = pkglog.RunInfo{
Level: slog.LevelError,
Details: details,
Message: fmt.Sprintf("panic in Go script: %v", r),
}
}
}()
i := golang.New(golang.Options{})
if err := i.Use(stdlib.Symbols); err != nil {
return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()}
@@ -77,7 +90,7 @@ func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *m
err = errors.Wrap(e, err)
}
}
ret := pkglog.RunInfo{Level: slog.LevelInfo, Details: details, Message: "rule processed successfully"}
ret = pkglog.RunInfo{Level: slog.LevelInfo, Details: details, Message: "rule processed successfully"}
if err != nil {
ret.Level = slog.LevelError
ret.Message = fmt.Sprintf("failed to handle rule output: %s", err)
+10
View File
@@ -18,6 +18,8 @@ import (
"github.com/absmach/supermq/pkg/messaging"
)
var ErrGoroutinesNotAllowed = errors.New("goroutines are not allowed in Go scripts")
type re struct {
repo Repository
runInfo chan pkglog.RunInfo
@@ -45,6 +47,10 @@ func NewService(repo Repository, runInfo chan pkglog.RunInfo, idp supermq.IDProv
}
func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
if r.Logic.Type == GoType && goKeywordRegex.MatchString(r.Logic.Value) {
return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed)
}
id, err := re.idp.ID()
if err != nil {
return Rule{}, err
@@ -79,6 +85,10 @@ func (re *re) ViewRule(ctx context.Context, session authn.Session, id string) (R
}
func (re *re) UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
if r.Logic.Type == GoType && goKeywordRegex.MatchString(r.Logic.Value) {
return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed)
}
r.UpdatedAt = time.Now().UTC()
r.UpdatedBy = session.UserID
rule, err := re.repo.UpdateRule(ctx, r)
+76
View File
@@ -160,6 +160,27 @@ func TestAddRule(t *testing.T) {
},
err: nil,
},
{
desc: "Add rule with Go script containing goroutines",
session: authn.Session{
UserID: userID,
DomainID: domainID,
},
rule: re.Rule{
Name: ruleName,
InputChannel: inputChannel,
Logic: re.Script{
Type: re.GoType,
Value: `func logicFunction() any { go func() {}(); return true }`,
},
Schedule: pkgSch.Schedule{
Recurring: pkgSch.Daily,
RecurringPeriod: 1,
Time: now,
},
},
err: re.ErrGoroutinesNotAllowed,
},
}
for _, tc := range cases {
@@ -306,6 +327,31 @@ func TestUpdateRule(t *testing.T) {
},
err: svcerr.ErrUpdateEntity,
},
{
desc: "update rule with Go script containing goroutines",
session: authn.Session{
UserID: userID,
DomainID: domainID,
},
rule: re.Rule{
Name: ruleName,
ID: ruleID,
InputChannel: inputChannel,
Logic: re.Script{
Type: re.GoType,
Value: `func logicFunction() any { go processData(); return true }`,
},
Schedule: pkgSch.Schedule{
Recurring: pkgSch.Daily,
RecurringPeriod: 1,
Time: now,
},
Status: re.EnabledStatus,
CreatedBy: userID,
DomainID: domainID,
},
err: re.ErrGoroutinesNotAllowed,
},
}
for _, tc := range cases {
@@ -1261,6 +1307,36 @@ func TestHandle(t *testing.T) {
},
listErr: nil,
},
{
desc: "consume message with GoType script that panics",
message: &messaging.Message{
Channel: inputChannel,
Created: now.Unix(),
Payload: []byte(`{"value": 42}`),
},
page: re.Page{
Rules: []re.Rule{
{
ID: testsutil.GenerateUUID(t),
Name: namegen.Generate(),
InputChannel: inputChannel,
Status: re.EnabledStatus,
Logic: re.Script{
Type: re.GoType,
Value: `func logicFunction() any { panic("test") }`,
},
Outputs: re.Outputs{
&outputs.ChannelPublisher{
Channel: "output.channel",
Topic: "output.topic",
},
},
Schedule: schedule,
},
},
},
listErr: nil,
},
{
desc: "consume message with Lua script and Postgres output",
message: &messaging.Message{