mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 04:10:34 +00:00
NOISSUE - Add alarms function to send alarm message to NATS (#114)
* add send alarm function Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com> * move to alarm page Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com> * add status column Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com> * update alarms function Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com> * update send alarm Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com> * fix tests Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com> * fix alarms prefix Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com> --------- Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>
This commit is contained in:
committed by
GitHub
parent
aef424c5da
commit
35552c2651
@@ -82,18 +82,12 @@ func (a Alarm) Validate() error {
|
||||
if a.ClientID == "" {
|
||||
return errors.New("client_id is required")
|
||||
}
|
||||
if a.Subtopic == "" {
|
||||
return errors.New("subtopic is required")
|
||||
}
|
||||
if a.Measurement == "" {
|
||||
return errors.New("measurement is required")
|
||||
}
|
||||
if a.Value == "" {
|
||||
return errors.New("value is required")
|
||||
}
|
||||
if a.Unit == "" {
|
||||
return errors.New("unit is required")
|
||||
}
|
||||
if a.Cause == "" {
|
||||
return errors.New("cause is required")
|
||||
}
|
||||
|
||||
@@ -96,21 +96,6 @@ func TestValidateAlarms(t *testing.T) {
|
||||
},
|
||||
err: errors.New("client_id is required"),
|
||||
},
|
||||
{
|
||||
desc: "missing subtopic",
|
||||
alarm: alarms.Alarm{
|
||||
RuleID: testsutil.GenerateUUID(t),
|
||||
DomainID: testsutil.GenerateUUID(t),
|
||||
ChannelID: testsutil.GenerateUUID(t),
|
||||
ClientID: testsutil.GenerateUUID(t),
|
||||
Measurement: "measurement",
|
||||
Value: "value",
|
||||
Unit: "unit",
|
||||
Cause: "cause",
|
||||
Severity: 100,
|
||||
},
|
||||
err: errors.New("subtopic is required"),
|
||||
},
|
||||
{
|
||||
desc: "missing measurement",
|
||||
alarm: alarms.Alarm{
|
||||
@@ -141,21 +126,6 @@ func TestValidateAlarms(t *testing.T) {
|
||||
},
|
||||
err: errors.New("value is required"),
|
||||
},
|
||||
{
|
||||
desc: "missing unit",
|
||||
alarm: alarms.Alarm{
|
||||
RuleID: testsutil.GenerateUUID(t),
|
||||
DomainID: testsutil.GenerateUUID(t),
|
||||
ChannelID: testsutil.GenerateUUID(t),
|
||||
ClientID: testsutil.GenerateUUID(t),
|
||||
Subtopic: "subtopic",
|
||||
Measurement: "measurement",
|
||||
Value: "value",
|
||||
Cause: "cause",
|
||||
Severity: 100,
|
||||
},
|
||||
err: errors.New("unit is required"),
|
||||
},
|
||||
{
|
||||
desc: "missing cause",
|
||||
alarm: alarms.Alarm{
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
const (
|
||||
AllTopic = "alarms.>"
|
||||
|
||||
prefix = "writers"
|
||||
prefix = "alarms"
|
||||
)
|
||||
|
||||
var cfg = jetstream.StreamConfig{
|
||||
|
||||
@@ -17,12 +17,12 @@ import (
|
||||
const (
|
||||
AllTopic = "alarms.#"
|
||||
|
||||
exchangeName = "writers"
|
||||
prefix = "writers"
|
||||
exchangeName = "alarms"
|
||||
prefix = "alarms"
|
||||
)
|
||||
|
||||
func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) {
|
||||
pb, err := broker.NewPubSub(url, logger, broker.Prefix("writers"), broker.Exchange(exchangeName))
|
||||
pb, err := broker.NewPubSub(url, logger, broker.Prefix(prefix), broker.Exchange(exchangeName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -31,7 +31,7 @@ func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.Pu
|
||||
}
|
||||
|
||||
func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) {
|
||||
pb, err := broker.NewPublisher(url, broker.Prefix("writers"), broker.Exchange(exchangeName))
|
||||
pb, err := broker.NewPublisher(url, broker.Prefix(prefix), broker.Exchange(exchangeName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -4,9 +4,12 @@
|
||||
package re
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/absmach/magistrala/alarms"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
"github.com/absmach/supermq/pkg/transformers/senml"
|
||||
lua "github.com/yuin/gopher-lua"
|
||||
@@ -63,3 +66,58 @@ func (re *re) sendEmail(L *lua.LState) int {
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
func (re *re) sendAlarm(ctx context.Context, ruleID string, original *messaging.Message) lua.LGFunction {
|
||||
return func(l *lua.LState) int {
|
||||
processAlarm := func(alarmTable *lua.LTable) int {
|
||||
val := convertLua(alarmTable)
|
||||
data, err := json.Marshal(val)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
alarm := alarms.Alarm{
|
||||
RuleID: ruleID,
|
||||
DomainID: original.Domain,
|
||||
ClientID: original.Publisher,
|
||||
ChannelID: original.Channel,
|
||||
Subtopic: original.Subtopic,
|
||||
}
|
||||
if err := json.Unmarshal(data, &alarm); err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := gob.NewEncoder(&buf).Encode(alarm); err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
m := &messaging.Message{
|
||||
Domain: original.Domain,
|
||||
Publisher: original.Publisher,
|
||||
Created: alarm.CreatedAt.UnixNano(),
|
||||
Channel: original.Channel,
|
||||
Subtopic: original.Subtopic,
|
||||
Protocol: original.Protocol,
|
||||
Payload: buf.Bytes(),
|
||||
}
|
||||
|
||||
if err := re.alarmsPub.Publish(ctx, original.Channel, m); err != nil {
|
||||
return 0
|
||||
}
|
||||
return 1
|
||||
}
|
||||
table := l.ToTable(1)
|
||||
if table.RawGetInt(1) != lua.LNil {
|
||||
table.ForEach(func(_, value lua.LValue) {
|
||||
if alarmTable, ok := value.(*lua.LTable); ok {
|
||||
processAlarm(alarmTable)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
processAlarm(table)
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,6 +248,7 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error
|
||||
// set the email function as a Lua global function.
|
||||
l.SetGlobal("send_email", l.NewFunction(re.sendEmail))
|
||||
l.SetGlobal("save_senml", l.NewFunction(re.save(ctx, msg)))
|
||||
l.SetGlobal("send_alarm", l.NewFunction(re.sendAlarm(ctx, r.ID, msg)))
|
||||
|
||||
if err := l.DoString(string(r.Logic.Value)); err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user