From de6f3921a43252ae4306da813a47e8f597c8f027 Mon Sep 17 00:00:00 2001 From: Steve Munene Date: Tue, 3 Mar 2026 17:34:27 +0300 Subject: [PATCH] MG-406 - Fix `panic` issues in rules engine go script (#407) * initial implementation Signed-off-by: nyagamunene * revert variable Signed-off-by: nyagamunene * fix tests Signed-off-by: nyagamunene * fix tests Signed-off-by: nyagamunene * address comments Signed-off-by: nyagamunene * update tests Signed-off-by: nyagamunene * update tests Signed-off-by: nyagamunene --------- Signed-off-by: nyagamunene --- re/golang.go | 17 +++++++++-- re/service.go | 10 ++++++ re/service_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 2 deletions(-) diff --git a/re/golang.go b/re/golang.go index a7a831eeb..5ee8acecc 100644 --- a/re/golang.go +++ b/re/golang.go @@ -9,6 +9,7 @@ import ( "fmt" "log/slog" "reflect" + "regexp" pkglog "github.com/absmach/magistrala/pkg/logger" "github.com/absmach/supermq/pkg/errors" @@ -19,6 +20,8 @@ import ( const logicFunction = "main.logicFunction" +var goKeywordRegex = regexp.MustCompile(`\bgo\s+func\s*\(|^\s*go\s+\w+\(|[;\s{]go\s+func\s*\(|[;\s{]go\s+\w+\(`) + // Type message is an SMQ message with payload replaces by JSON deserialized payload. type message struct { Channel string `json:"channel,omitempty"` @@ -30,7 +33,17 @@ type message struct { Payload any `json:"payload,omitempty"` } -func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *messaging.Message) pkglog.RunInfo { +func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *messaging.Message) (ret pkglog.RunInfo) { + defer func() { + if r := recover(); r != nil { + ret = pkglog.RunInfo{ + Level: slog.LevelError, + Details: details, + Message: fmt.Sprintf("panic in Go script: %v", r), + } + } + }() + i := golang.New(golang.Options{}) if err := i.Use(stdlib.Symbols); err != nil { return pkglog.RunInfo{Level: slog.LevelError, Details: details, Message: err.Error()} @@ -77,7 +90,7 @@ func (re *re) processGo(ctx context.Context, details []slog.Attr, r Rule, msg *m err = errors.Wrap(e, err) } } - ret := pkglog.RunInfo{Level: slog.LevelInfo, Details: details, Message: "rule processed successfully"} + ret = pkglog.RunInfo{Level: slog.LevelInfo, Details: details, Message: "rule processed successfully"} if err != nil { ret.Level = slog.LevelError ret.Message = fmt.Sprintf("failed to handle rule output: %s", err) diff --git a/re/service.go b/re/service.go index 241fe6ea6..e041d901c 100644 --- a/re/service.go +++ b/re/service.go @@ -18,6 +18,8 @@ import ( "github.com/absmach/supermq/pkg/messaging" ) +var ErrGoroutinesNotAllowed = errors.New("goroutines are not allowed in Go scripts") + type re struct { repo Repository runInfo chan pkglog.RunInfo @@ -45,6 +47,10 @@ func NewService(repo Repository, runInfo chan pkglog.RunInfo, idp supermq.IDProv } func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) { + if r.Logic.Type == GoType && goKeywordRegex.MatchString(r.Logic.Value) { + return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed) + } + id, err := re.idp.ID() if err != nil { return Rule{}, err @@ -79,6 +85,10 @@ func (re *re) ViewRule(ctx context.Context, session authn.Session, id string) (R } func (re *re) UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) { + if r.Logic.Type == GoType && goKeywordRegex.MatchString(r.Logic.Value) { + return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed) + } + r.UpdatedAt = time.Now().UTC() r.UpdatedBy = session.UserID rule, err := re.repo.UpdateRule(ctx, r) diff --git a/re/service_test.go b/re/service_test.go index f5d949368..7cc8285a7 100644 --- a/re/service_test.go +++ b/re/service_test.go @@ -160,6 +160,27 @@ func TestAddRule(t *testing.T) { }, err: nil, }, + { + desc: "Add rule with Go script containing goroutines", + session: authn.Session{ + UserID: userID, + DomainID: domainID, + }, + rule: re.Rule{ + Name: ruleName, + InputChannel: inputChannel, + Logic: re.Script{ + Type: re.GoType, + Value: `func logicFunction() any { go func() {}(); return true }`, + }, + Schedule: pkgSch.Schedule{ + Recurring: pkgSch.Daily, + RecurringPeriod: 1, + Time: now, + }, + }, + err: re.ErrGoroutinesNotAllowed, + }, } for _, tc := range cases { @@ -306,6 +327,31 @@ func TestUpdateRule(t *testing.T) { }, err: svcerr.ErrUpdateEntity, }, + { + desc: "update rule with Go script containing goroutines", + session: authn.Session{ + UserID: userID, + DomainID: domainID, + }, + rule: re.Rule{ + Name: ruleName, + ID: ruleID, + InputChannel: inputChannel, + Logic: re.Script{ + Type: re.GoType, + Value: `func logicFunction() any { go processData(); return true }`, + }, + Schedule: pkgSch.Schedule{ + Recurring: pkgSch.Daily, + RecurringPeriod: 1, + Time: now, + }, + Status: re.EnabledStatus, + CreatedBy: userID, + DomainID: domainID, + }, + err: re.ErrGoroutinesNotAllowed, + }, } for _, tc := range cases { @@ -1261,6 +1307,36 @@ func TestHandle(t *testing.T) { }, listErr: nil, }, + { + desc: "consume message with GoType script that panics", + message: &messaging.Message{ + Channel: inputChannel, + Created: now.Unix(), + Payload: []byte(`{"value": 42}`), + }, + page: re.Page{ + Rules: []re.Rule{ + { + ID: testsutil.GenerateUUID(t), + Name: namegen.Generate(), + InputChannel: inputChannel, + Status: re.EnabledStatus, + Logic: re.Script{ + Type: re.GoType, + Value: `func logicFunction() any { panic("test") }`, + }, + Outputs: re.Outputs{ + &outputs.ChannelPublisher{ + Channel: "output.channel", + Topic: "output.topic", + }, + }, + Schedule: schedule, + }, + }, + }, + listErr: nil, + }, { desc: "consume message with Lua script and Postgres output", message: &messaging.Message{