diff --git a/alarms/alarms.go b/alarms/alarms.go index ed4cc6647..b89993125 100644 --- a/alarms/alarms.go +++ b/alarms/alarms.go @@ -82,18 +82,12 @@ func (a Alarm) Validate() error { if a.ClientID == "" { return errors.New("client_id is required") } - if a.Subtopic == "" { - return errors.New("subtopic is required") - } if a.Measurement == "" { return errors.New("measurement is required") } if a.Value == "" { return errors.New("value is required") } - if a.Unit == "" { - return errors.New("unit is required") - } if a.Cause == "" { return errors.New("cause is required") } diff --git a/alarms/alarms_test.go b/alarms/alarms_test.go index 2b1d5ac01..2fc3ef72e 100644 --- a/alarms/alarms_test.go +++ b/alarms/alarms_test.go @@ -96,21 +96,6 @@ func TestValidateAlarms(t *testing.T) { }, err: errors.New("client_id is required"), }, - { - desc: "missing subtopic", - alarm: alarms.Alarm{ - RuleID: testsutil.GenerateUUID(t), - DomainID: testsutil.GenerateUUID(t), - ChannelID: testsutil.GenerateUUID(t), - ClientID: testsutil.GenerateUUID(t), - Measurement: "measurement", - Value: "value", - Unit: "unit", - Cause: "cause", - Severity: 100, - }, - err: errors.New("subtopic is required"), - }, { desc: "missing measurement", alarm: alarms.Alarm{ @@ -141,21 +126,6 @@ func TestValidateAlarms(t *testing.T) { }, err: errors.New("value is required"), }, - { - desc: "missing unit", - alarm: alarms.Alarm{ - RuleID: testsutil.GenerateUUID(t), - DomainID: testsutil.GenerateUUID(t), - ChannelID: testsutil.GenerateUUID(t), - ClientID: testsutil.GenerateUUID(t), - Subtopic: "subtopic", - Measurement: "measurement", - Value: "value", - Cause: "cause", - Severity: 100, - }, - err: errors.New("unit is required"), - }, { desc: "missing cause", alarm: alarms.Alarm{ diff --git a/alarms/brokers/brokers_nats.go b/alarms/brokers/brokers_nats.go index cb2874a0f..32d08a1a1 100644 --- a/alarms/brokers/brokers_nats.go +++ b/alarms/brokers/brokers_nats.go @@ -19,7 +19,7 @@ import ( const ( AllTopic = "alarms.>" - prefix = "writers" + prefix = "alarms" ) var cfg = jetstream.StreamConfig{ diff --git a/alarms/brokers/brokers_rabbitmq.go b/alarms/brokers/brokers_rabbitmq.go index 266957549..6e567cca1 100644 --- a/alarms/brokers/brokers_rabbitmq.go +++ b/alarms/brokers/brokers_rabbitmq.go @@ -17,12 +17,12 @@ import ( const ( AllTopic = "alarms.#" - exchangeName = "writers" - prefix = "writers" + exchangeName = "alarms" + prefix = "alarms" ) func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) { - pb, err := broker.NewPubSub(url, logger, broker.Prefix("writers"), broker.Exchange(exchangeName)) + pb, err := broker.NewPubSub(url, logger, broker.Prefix(prefix), broker.Exchange(exchangeName)) if err != nil { return nil, err } @@ -31,7 +31,7 @@ func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.Pu } func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) { - pb, err := broker.NewPublisher(url, broker.Prefix("writers"), broker.Exchange(exchangeName)) + pb, err := broker.NewPublisher(url, broker.Prefix(prefix), broker.Exchange(exchangeName)) if err != nil { return nil, err } diff --git a/re/bindings.go b/re/bindings.go index 16a983046..8070cf58f 100644 --- a/re/bindings.go +++ b/re/bindings.go @@ -4,9 +4,12 @@ package re import ( + "bytes" "context" + "encoding/gob" "encoding/json" + "github.com/absmach/magistrala/alarms" "github.com/absmach/supermq/pkg/messaging" "github.com/absmach/supermq/pkg/transformers/senml" lua "github.com/yuin/gopher-lua" @@ -63,3 +66,58 @@ func (re *re) sendEmail(L *lua.LState) int { } return 1 } + +func (re *re) sendAlarm(ctx context.Context, ruleID string, original *messaging.Message) lua.LGFunction { + return func(l *lua.LState) int { + processAlarm := func(alarmTable *lua.LTable) int { + val := convertLua(alarmTable) + data, err := json.Marshal(val) + if err != nil { + return 0 + } + + alarm := alarms.Alarm{ + RuleID: ruleID, + DomainID: original.Domain, + ClientID: original.Publisher, + ChannelID: original.Channel, + Subtopic: original.Subtopic, + } + if err := json.Unmarshal(data, &alarm); err != nil { + return 0 + } + + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(alarm); err != nil { + return 0 + } + + m := &messaging.Message{ + Domain: original.Domain, + Publisher: original.Publisher, + Created: alarm.CreatedAt.UnixNano(), + Channel: original.Channel, + Subtopic: original.Subtopic, + Protocol: original.Protocol, + Payload: buf.Bytes(), + } + + if err := re.alarmsPub.Publish(ctx, original.Channel, m); err != nil { + return 0 + } + return 1 + } + table := l.ToTable(1) + if table.RawGetInt(1) != lua.LNil { + table.ForEach(func(_, value lua.LValue) { + if alarmTable, ok := value.(*lua.LTable); ok { + processAlarm(alarmTable) + } + }) + } else { + processAlarm(table) + } + + return 1 + } +} diff --git a/re/service.go b/re/service.go index 8ac695d3d..5b5732d0d 100644 --- a/re/service.go +++ b/re/service.go @@ -248,6 +248,7 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error // set the email function as a Lua global function. l.SetGlobal("send_email", l.NewFunction(re.sendEmail)) l.SetGlobal("save_senml", l.NewFunction(re.save(ctx, msg))) + l.SetGlobal("send_alarm", l.NewFunction(re.sendAlarm(ctx, r.ID, msg))) if err := l.DoString(string(r.Logic.Value)); err != nil { return err