NOISSUE - Fix RE message handling (#113)

* NOISSUE - Fix RE message handing

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* NOISSUE - Fix docker compose

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Add supprot for Lua message list

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix Lua indexing

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update SMQ

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix transformer key

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix typo and add int value

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Add JSON traverse

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update SMQ

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Add supprot for nested objects

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix JSON traversal

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

---------

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
Dušan Borovčanin
2025-04-10 23:07:42 +02:00
committed by GitHub
parent 99a3505003
commit 4b5474ce28
6 changed files with 121 additions and 46 deletions
+1 -1
View File
@@ -213,4 +213,4 @@ services:
target: /auth-grpc-server-ca${SMQ_AUTH_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true
- ./config.toml:/config.toml
- ./re_config.toml:/config.toml
+11
View File
@@ -0,0 +1,11 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
# To listen all messsage broker subjects use default value "channels.>".
# To subscribe to specific subjects use values starting by "channels." and
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
[subjects]
filter = ["channels.>"]
[transformer]
format = ""
+2 -2
View File
@@ -318,9 +318,9 @@ definition domain {
channel_update + channel_read + channel_delete + channel_set_parent_group + channel_connect_to_client + channel_publish + channel_subscribe +
channel_manage_role + channel_add_role_users + channel_remove_role_users + channel_view_role_users +
group_update + group_membership + group_read + group_delete + group_set_child + group_set_parent +
group_manage_role + group_add_role_users + group_remove_role_users + group_view_role_users
group_manage_role + group_add_role_users + group_remove_role_users + group_view_role_users + organization->admin
permission admin = read & update & enable & disable & delete & manage_role & add_role_users & remove_role_users & view_role_users
permission admin = (read & update & enable & disable & delete & manage_role & add_role_users & remove_role_users & view_role_users) + organization->admin
permission client_create_permission = client_create + team->client_create + organization->admin
permission channel_create_permission = channel_create + team->channel_create + organization->admin
+1 -1
View File
@@ -5,7 +5,7 @@ go 1.24.2
require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/supermq v0.16.1-0.20250409211847-814f19390d0e
github.com/absmach/supermq v0.16.1-0.20250410145505-299cee777101
github.com/authzed/authzed-go v1.3.1-0.20250320210445-0cde0d8c71e2
github.com/authzed/grpcutil v0.0.0-20250221190651-1985b19b35b8
github.com/caarlos0/env/v11 v11.3.1
+2 -2
View File
@@ -28,8 +28,8 @@ github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.7 h1:XLvpw0qxbP2QhOz7KLM2ZRar+vSCpSG/0o0kEvWx3No=
github.com/absmach/senml v1.0.7/go.mod h1:3bRIiNc8hq7l3auMs8gQrpsM5hHy7iDuiLILrf/+MfA=
github.com/absmach/supermq v0.16.1-0.20250409211847-814f19390d0e h1:ne0gLjWDr+AuvSeE7FINn4bXls4nk6GMURpVT+HjX18=
github.com/absmach/supermq v0.16.1-0.20250409211847-814f19390d0e/go.mod h1:dJqO3luvt+zLVuDhxOdsRRCuv945F86Nf1BXxFro+nU=
github.com/absmach/supermq v0.16.1-0.20250410145505-299cee777101 h1:scTaYzT3NF2JQ1GHkL0PF6MpFVSOJk++fxp/j7UJrOs=
github.com/absmach/supermq v0.16.1-0.20250410145505-299cee777101/go.mod h1:dJqO3luvt+zLVuDhxOdsRRCuv945F86Nf1BXxFro+nU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+104 -40
View File
@@ -5,6 +5,7 @@ package re
import (
"context"
"encoding/json"
"time"
"github.com/absmach/supermq"
@@ -13,15 +14,16 @@ import (
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers"
mgjson "github.com/absmach/supermq/pkg/transformers/json"
"github.com/absmach/supermq/pkg/transformers/senml"
mgsenml "github.com/absmach/supermq/pkg/transformers/senml"
"github.com/vadv/gopher-lua-libs/argparse"
"github.com/vadv/gopher-lua-libs/base64"
"github.com/vadv/gopher-lua-libs/crypto"
"github.com/vadv/gopher-lua-libs/db"
"github.com/vadv/gopher-lua-libs/filepath"
"github.com/vadv/gopher-lua-libs/ioutil"
"github.com/vadv/gopher-lua-libs/json"
luajson "github.com/vadv/gopher-lua-libs/json"
"github.com/vadv/gopher-lua-libs/regexp"
"github.com/vadv/gopher-lua-libs/storage"
"github.com/vadv/gopher-lua-libs/strings"
@@ -116,6 +118,7 @@ type re struct {
errors chan error
ticker Ticker
email Emailer
ts []transformers.Transformer
}
func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub, tck Ticker, emailer Emailer) Service {
@@ -126,6 +129,11 @@ func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub
errors: make(chan error),
ticker: tck,
email: emailer,
// Transformers order is important since SenML is also JSON content type.
ts: []transformers.Transformer{
mgsenml.New(mgsenml.JSON),
mgjson.New(nil),
},
}
}
@@ -215,22 +223,16 @@ func (re *re) DisableRule(ctx context.Context, session authn.Session, id string)
}
func (re *re) ConsumeAsync(ctx context.Context, msgs interface{}) {
var inputChannel string
switch m := msgs.(type) {
case *messaging.Message:
inputChannel = m.Channel
case []senml.Message:
if len(m) == 0 {
return
m, ok := msgs.(*messaging.Message)
if !ok {
return
}
inputChannel := m.Channel
for _, t := range re.ts {
if v, err := t.Transform(m); err == nil {
msgs = v
break
}
message := m[0]
inputChannel = message.Channel
case mgjson.Message:
return
default:
return
}
pm := PageMeta{
@@ -242,6 +244,7 @@ func (re *re) ConsumeAsync(ctx context.Context, msgs interface{}) {
re.errors <- err
return
}
for _, r := range page.Rules {
go func(ctx context.Context) {
re.errors <- re.process(ctx, r, msgs)
@@ -259,9 +262,10 @@ func (re *re) process(ctx context.Context, r Rule, msg interface{}) error {
preload(l)
message := l.NewTable()
messages := l.NewTable()
switch m := msg.(type) {
case messaging.Message:
case *messaging.Message:
{
l.RawSet(message, lua.LString("channel"), lua.LString(m.Channel))
l.RawSet(message, lua.LString("subtopic"), lua.LString(m.Subtopic))
@@ -276,36 +280,58 @@ func (re *re) process(ctx context.Context, r Rule, msg interface{}) error {
l.RawSet(message, lua.LString("payload"), pld)
}
case []senml.Message:
msg := m[0]
l.RawSet(message, lua.LString("channel"), lua.LString(msg.Channel))
l.RawSet(message, lua.LString("subtopic"), lua.LString(msg.Subtopic))
l.RawSet(message, lua.LString("publisher"), lua.LString(msg.Publisher))
l.RawSet(message, lua.LString("protocol"), lua.LString(msg.Protocol))
l.RawSet(message, lua.LString("name"), lua.LString(msg.Name))
l.RawSet(message, lua.LString("unit"), lua.LString(msg.Unit))
l.RawSet(message, lua.LString("time"), lua.LNumber(msg.Time))
l.RawSet(message, lua.LString("update_time"), lua.LNumber(msg.UpdateTime))
case []mgsenml.Message:
for i, msg := range m {
insert := l.NewTable()
insert.RawSetString("channel", lua.LString(msg.Channel))
insert.RawSetString("subtopic", lua.LString(msg.Subtopic))
insert.RawSetString("publisher", lua.LString(msg.Publisher))
insert.RawSetString("protocol", lua.LString(msg.Protocol))
insert.RawSetString("name", lua.LString(msg.Name))
insert.RawSetString("unit", lua.LString(msg.Unit))
insert.RawSetString("time", lua.LNumber(msg.Time))
insert.RawSetString("update_time", lua.LNumber(msg.UpdateTime))
if msg.Value != nil {
l.RawSet(message, lua.LString("value"), lua.LNumber(*msg.Value))
if msg.Value != nil {
insert.RawSetString("value", lua.LNumber(*msg.Value))
}
if msg.StringValue != nil {
insert.RawSetString("string_value", lua.LString(*msg.StringValue))
}
if msg.DataValue != nil {
insert.RawSetString("data_value", lua.LString(*msg.DataValue))
}
if msg.BoolValue != nil {
insert.RawSetString("bool_value", lua.LBool(*msg.BoolValue))
}
if msg.Sum != nil {
insert.RawSetString("sum", lua.LNumber(*msg.Sum))
}
messages.RawSetInt(i+1, insert) // Lua index starts at 1.
}
if msg.StringValue != nil {
l.RawSet(message, lua.LString("string_value"), lua.LString(*msg.StringValue))
if len(m) == 1 {
message = messages.RawGetInt(1).(*lua.LTable)
}
if msg.DataValue != nil {
l.RawSet(message, lua.LString("data_value"), lua.LString(*msg.DataValue))
case mgjson.Messages:
for i, msg := range m.Data {
insert := l.NewTable()
insert.RawSetString("channel", lua.LString(msg.Channel))
insert.RawSetString("subtopic", lua.LString(msg.Subtopic))
insert.RawSetString("publisher", lua.LString(msg.Publisher))
insert.RawSetString("protocol", lua.LString(msg.Protocol))
insert.RawSetString("format", lua.LString(m.Format))
traverseJson(l, insert, lua.LString("payload"), map[string]interface{}(msg.Payload))
messages.RawSetInt(i+1, insert) // Lua index starts at 1.
}
if msg.BoolValue != nil {
l.RawSet(message, lua.LString("bool_value"), lua.LBool(*msg.BoolValue))
}
if msg.Sum != nil {
l.RawSet(message, lua.LString("sum"), lua.LNumber(*msg.Sum))
if len(m.Data) == 1 {
message = messages.RawGetInt(1).(*lua.LTable)
}
}
// Set the message object as a Lua global variable.
l.SetGlobal("message", message)
l.SetGlobal("messages", messages)
// set the email function as a Lua global function
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
@@ -436,7 +462,7 @@ func (re *re) sendEmail(L *lua.LState) int {
func preload(l *lua.LState) {
db.Preload(l)
ioutil.Preload(l)
json.Preload(l)
luajson.Preload(l)
yaml.Preload(l)
crypto.Preload(l)
regexp.Preload(l)
@@ -447,3 +473,41 @@ func preload(l *lua.LState) {
strings.Preload(l)
filepath.Preload(l)
}
func traverseJson(l *lua.LState, parent *lua.LTable, key lua.LValue, value interface{}) {
var lval lua.LValue
switch val := value.(type) {
case string:
lval = lua.LString(val)
case float64:
lval = lua.LNumber(val)
case int:
lval = lua.LNumber(float64(val))
case json.Number:
if num, err := val.Float64(); err != nil {
lval = lua.LNumber(num)
}
case bool:
lval = lua.LBool(val)
case []interface{}:
t := l.NewTable()
for i, j := range val {
traverseJson(l, t, lua.LNumber(i+1), j)
}
lval = t
case map[string]interface{}:
t := l.NewTable()
for k, v := range val {
traverseJson(l, t, lua.LString(k), v)
}
lval = t
case []map[string]interface{}:
t := l.NewTable()
for i, j := range val {
traverseJson(l, t, lua.LNumber(i+1), j)
}
lval = t
}
parent.RawSet(key, lval)
return
}