mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:00:25 +00:00
MG-181 - Add wildcard support to RE (#185)
* Add wildcard Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Fix RE tests Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Add comments for wildcards Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Fix match check Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> --------- Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
@@ -5,7 +5,7 @@ go 1.24.2
|
||||
require (
|
||||
github.com/0x6flab/namegenerator v1.4.0
|
||||
github.com/absmach/callhome v0.14.0
|
||||
github.com/absmach/supermq v0.16.1-0.20250526123821-a95d1bb672ae
|
||||
github.com/absmach/supermq v0.16.1-0.20250602095825-5e96516bf6fb
|
||||
github.com/authzed/authzed-go v1.4.0
|
||||
github.com/authzed/grpcutil v0.0.0-20250221190651-1985b19b35b8
|
||||
github.com/caarlos0/env/v11 v11.3.1
|
||||
@@ -47,6 +47,7 @@ require (
|
||||
|
||||
require (
|
||||
github.com/boombuler/barcode v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
|
||||
github.com/jung-kurt/gofpdf v1.16.2 // indirect
|
||||
github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245 // indirect
|
||||
)
|
||||
@@ -132,7 +133,7 @@ require (
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.36.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
|
||||
@@ -141,7 +142,7 @@ require (
|
||||
golang.org/x/net v0.40.0 // indirect
|
||||
golang.org/x/sys v0.33.0 // indirect
|
||||
golang.org/x/text v0.25.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250428153025-10db94c68c34 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 // indirect
|
||||
google.golang.org/protobuf v1.36.6
|
||||
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
||||
|
||||
@@ -28,8 +28,8 @@ github.com/absmach/mgate v0.4.6-0.20250425104654-79c62d581921 h1:Y0M0jtSbKmfrwLW
|
||||
github.com/absmach/mgate v0.4.6-0.20250425104654-79c62d581921/go.mod h1:BYazn/DsEeZxJxWZxy/5NiaS/CfWpR/5auYmbq43VwQ=
|
||||
github.com/absmach/senml v1.0.7 h1:XLvpw0qxbP2QhOz7KLM2ZRar+vSCpSG/0o0kEvWx3No=
|
||||
github.com/absmach/senml v1.0.7/go.mod h1:3bRIiNc8hq7l3auMs8gQrpsM5hHy7iDuiLILrf/+MfA=
|
||||
github.com/absmach/supermq v0.16.1-0.20250526123821-a95d1bb672ae h1:mt3KTdf/ho+xwS875Rki13r5QaI6pddMINddI/6/5wI=
|
||||
github.com/absmach/supermq v0.16.1-0.20250526123821-a95d1bb672ae/go.mod h1:xb0C6v2mjpGqH/JKeziBknznfOlfeYDPBzTu5NSVNtg=
|
||||
github.com/absmach/supermq v0.16.1-0.20250602095825-5e96516bf6fb h1:4RGAt42da6v6CkAOb9gDUNHSIzIlo8dXtrnDlrozA+8=
|
||||
github.com/absmach/supermq v0.16.1-0.20250602095825-5e96516bf6fb/go.mod h1:WbcEX19AtubL9txRVv5wnU0xueSOOjad0JJ/1bgMntE=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
@@ -58,6 +58,8 @@ github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vaui
|
||||
github.com/cbroglie/mustache v1.0.1/go.mod h1:R/RUa+SobQ14qkP4jtx5Vke5sDytONDQXNLPY/PO69g=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8=
|
||||
github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s=
|
||||
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
|
||||
@@ -514,8 +516,8 @@ go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
|
||||
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 h1:dNzwXjZKpMpE2JhmO+9HsPl42NIXFIFSUSSs0fiqra0=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0/go.mod h1:90PoxvaEB5n6AOdZvi+yWJQoE95U8Dhhw2bSyRqnTD0=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 h1:nRVXXvf78e00EwY6Wp0YII8ww2JVWshZ20HfTlE11AM=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0/go.mod h1:r49hO7CgrxY9Voaj3Xe8pANWtr0Oq916d0XAmOoCZAQ=
|
||||
go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE=
|
||||
go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
|
||||
go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
|
||||
@@ -695,8 +697,8 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250428153025-10db94c68c34 h1:0PeQib/pH3nB/5pEmFeVQJotzGohV0dq4Vcp09H5yhE=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250428153025-10db94c68c34/go.mod h1:0awUlEkap+Pb1UMeJwJQQAdJQrt3moU7J2moTy69irI=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 h1:Kog3KlB4xevJlAcbbbzPfRG0+X9fdoGM+UBRKVz6Wr0=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237/go.mod h1:ezi0AVyMKDWy5xAncvjLWH7UcLBB5n7y2fQ8MzjJcto=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 h1:cJfm9zPbe1e873mHJzmQ1nwVEeRDU/T1wXDK2kUSU34=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
|
||||
+39
-9
@@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/supermq/pkg/errors"
|
||||
@@ -15,6 +16,11 @@ import (
|
||||
lua "github.com/yuin/gopher-lua"
|
||||
)
|
||||
|
||||
var (
|
||||
scheduledTrue = true
|
||||
scheduledFalse = false
|
||||
)
|
||||
|
||||
const (
|
||||
maxPayload = 100 * 1024
|
||||
pldExceededFmt = "max payload size of 100kB exceeded: "
|
||||
@@ -25,11 +31,14 @@ func (re *re) Handle(msg *messaging.Message) error {
|
||||
if n := len(msg.Payload); n > maxPayload {
|
||||
return errors.New(pldExceededFmt + strconv.Itoa(n))
|
||||
}
|
||||
// Skip filtering by message topic and fetch all non-scheduled rules instead.
|
||||
// It's cleaner and more efficient to match wildcards in Go, but we can
|
||||
// revisit this if it ever becomes a performance bottleneck.
|
||||
pm := PageMeta{
|
||||
Domain: msg.Domain,
|
||||
InputChannel: msg.Channel,
|
||||
Status: EnabledStatus,
|
||||
InputTopic: &msg.Subtopic,
|
||||
Scheduled: &scheduledFalse,
|
||||
}
|
||||
ctx := context.Background()
|
||||
page, err := re.repo.ListRules(ctx, pm)
|
||||
@@ -38,14 +47,36 @@ func (re *re) Handle(msg *messaging.Message) error {
|
||||
}
|
||||
|
||||
for _, r := range page.Rules {
|
||||
go func(ctx context.Context) {
|
||||
re.runInfo <- re.process(ctx, r, msg)
|
||||
}(ctx)
|
||||
if matchSubject(msg.Subtopic, r.InputTopic) {
|
||||
go func(ctx context.Context) {
|
||||
re.runInfo <- re.process(ctx, r, msg)
|
||||
}(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Match NATS subject to support wildcardas.
|
||||
func matchSubject(published, subscribed string) bool {
|
||||
p := strings.Split(published, ".")
|
||||
s := strings.Split(subscribed, ".")
|
||||
n := len(p)
|
||||
|
||||
for i := range s {
|
||||
if s[i] == ">" {
|
||||
return true
|
||||
}
|
||||
if i >= n {
|
||||
return false
|
||||
}
|
||||
if s[i] != "*" && p[i] != s[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return len(s) == n
|
||||
}
|
||||
|
||||
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) RunInfo {
|
||||
l := lua.NewState()
|
||||
defer l.Close()
|
||||
@@ -65,7 +96,7 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) RunIn
|
||||
slog.String("domain_id", r.DomainID),
|
||||
slog.String("rule_id", r.ID),
|
||||
slog.String("rule_name", r.Name),
|
||||
slog.Time("time", time.Now().UTC()),
|
||||
slog.Time("exec_time", time.Now().UTC()),
|
||||
}
|
||||
if err := l.DoString(r.Logic.Value); err != nil {
|
||||
return RunInfo{Level: slog.LevelError, Message: fmt.Sprintf("failed to run rule logic: %s", err), Details: details}
|
||||
@@ -116,16 +147,15 @@ func (re *re) handleOutput(ctx context.Context, o ScriptOutput, r Rule, msg *mes
|
||||
|
||||
func (re *re) StartScheduler(ctx context.Context) error {
|
||||
defer re.ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-re.ticker.Tick():
|
||||
due := time.Now().UTC()
|
||||
|
||||
pm := PageMeta{
|
||||
Status: EnabledStatus,
|
||||
Scheduled: &scheduledTrue,
|
||||
ScheduledBefore: &due,
|
||||
}
|
||||
|
||||
@@ -156,7 +186,7 @@ func (re *re) StartScheduler(ctx context.Context) error {
|
||||
re.runInfo <- re.process(ctx, rule, msg)
|
||||
}(r)
|
||||
}
|
||||
// Reset due, it will reset the page meta as well.
|
||||
// Reset due, it will reset in the page meta as well.
|
||||
due = time.Now().UTC()
|
||||
|
||||
reportConfigs, err := re.repo.ListReportsConfig(ctx, pm)
|
||||
@@ -181,7 +211,7 @@ func (re *re) StartScheduler(ctx context.Context) error {
|
||||
slog.String("domain_id", cfg.DomainID),
|
||||
slog.String("report_id", cfg.ID),
|
||||
slog.String("report_name", cfg.Name),
|
||||
slog.Time("time", time.Now().UTC()),
|
||||
slog.Time("exec_time", time.Now().UTC()),
|
||||
},
|
||||
}
|
||||
if err != nil {
|
||||
|
||||
@@ -342,6 +342,9 @@ func pageRulesQuery(pm re.PageMeta) string {
|
||||
if pm.Name != "" {
|
||||
query = append(query, "r.name ILIKE '%' || :name || '%'")
|
||||
}
|
||||
if pm.Scheduled != nil && !*pm.Scheduled {
|
||||
query = append(query, "r.time IS NULL")
|
||||
}
|
||||
|
||||
var q string
|
||||
if len(query) > 0 {
|
||||
|
||||
@@ -50,6 +50,7 @@ type PageMeta struct {
|
||||
Name string `json:"name" db:"name"`
|
||||
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
|
||||
InputTopic *string `json:"input_topic,omitempty" db:"input_topic"`
|
||||
Scheduled *bool `json:"scheduled,omitempty"`
|
||||
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
|
||||
Status Status `json:"status,omitempty" db:"status"`
|
||||
Domain string `json:"domain_id,omitempty" db:"domain_id"`
|
||||
|
||||
+2
-14
@@ -545,12 +545,10 @@ func TestDisableRule(t *testing.T) {
|
||||
func TestHandle(t *testing.T) {
|
||||
svc, repo, pubmocks, _ := newService(t, make(chan re.RunInfo))
|
||||
now := time.Now()
|
||||
empty := ""
|
||||
|
||||
scheduled := false
|
||||
cases := []struct {
|
||||
desc string
|
||||
message *messaging.Message
|
||||
pageMeta re.PageMeta
|
||||
page re.Page
|
||||
listErr error
|
||||
publishErr error
|
||||
@@ -562,11 +560,6 @@ func TestHandle(t *testing.T) {
|
||||
Channel: inputChannel,
|
||||
Created: now.Unix(),
|
||||
},
|
||||
pageMeta: re.PageMeta{
|
||||
InputChannel: inputChannel,
|
||||
InputTopic: &empty,
|
||||
Status: re.EnabledStatus,
|
||||
},
|
||||
page: re.Page{
|
||||
Rules: []re.Rule{},
|
||||
},
|
||||
@@ -578,11 +571,6 @@ func TestHandle(t *testing.T) {
|
||||
Channel: inputChannel,
|
||||
Created: now.Unix(),
|
||||
},
|
||||
pageMeta: re.PageMeta{
|
||||
InputChannel: inputChannel,
|
||||
Status: re.EnabledStatus,
|
||||
InputTopic: &empty,
|
||||
},
|
||||
page: re.Page{
|
||||
Rules: []re.Rule{
|
||||
{
|
||||
@@ -606,7 +594,7 @@ func TestHandle(t *testing.T) {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
var err error
|
||||
|
||||
repoCall := repo.On("ListRules", mock.Anything, tc.pageMeta).Return(tc.page, tc.listErr).Run(func(args mock.Arguments) {
|
||||
repoCall := repo.On("ListRules", mock.Anything, re.PageMeta{Domain: tc.message.Domain, InputChannel: tc.message.Channel, Scheduled: &scheduled}).Return(tc.page, tc.listErr).Run(func(args mock.Arguments) {
|
||||
if tc.listErr != nil {
|
||||
err = tc.listErr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user