NOISSUE - Add Measurement to Creation of alarm (#158)

* fix alarm logic

Signed-off-by: Arvindh <arvindh91@gmail.com>

* rename created_till to created_to

Signed-off-by: Arvindh <arvindh91@gmail.com>

* move alarm logic to sql

Signed-off-by: Arvindh <arvindh91@gmail.com>

---------

Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
Arvindh
2025-05-19 17:19:28 +05:30
committed by GitHub
parent 5d7d4b842d
commit e10a7646cd
12 changed files with 136 additions and 67 deletions
+17 -14
View File
@@ -53,20 +53,23 @@ type AlarmsPage struct {
}
type PageMetadata struct {
Offset uint64 `json:"offset" db:"offset"`
Limit uint64 `json:"limit" db:"limit"`
DomainID string `json:"domain_id" db:"domain_id"`
ChannelID string `json:"channel_id" db:"channel_id"`
ClientID string `json:"client_id" db:"client_id"`
Subtopic string `json:"subtopic" db:"subtopic"`
RuleID string `json:"rule_id" db:"rule_id"`
Status Status `json:"status" db:"status"`
AssigneeID string `json:"assignee_id" db:"assignee_id"`
Severity uint8 `json:"severity" db:"severity"`
UpdatedBy string `json:"updated_by" db:"updated_by"`
AssignedBy string `json:"assigned_by" db:"assigned_by"`
AcknowledgedBy string `json:"acknowledged_by" db:"acknowledged_by"`
ResolvedBy string `json:"resolved_by" db:"resolved_by"`
Offset uint64 `json:"offset" db:"offset"`
Limit uint64 `json:"limit" db:"limit"`
DomainID string `json:"domain_id" db:"domain_id"`
RuleID string `json:"rule_id" db:"rule_id"`
ChannelID string `json:"channel_id" db:"channel_id"`
ClientID string `json:"client_id" db:"client_id"`
Subtopic string `json:"subtopic" db:"subtopic"`
Measurement string `json:"measurement" db:"measurement"`
Status Status `json:"status" db:"status"`
CreatedFrom time.Time `json:"created_from" db:"created_from"`
CreatedTo time.Time `json:"created_to" db:"created_to"`
AssigneeID string `json:"assignee_id" db:"assignee_id"`
Severity uint8 `json:"severity" db:"severity"`
UpdatedBy string `json:"updated_by" db:"updated_by"`
AssignedBy string `json:"assigned_by" db:"assigned_by"`
AcknowledgedBy string `json:"acknowledged_by" db:"acknowledged_by"`
ResolvedBy string `json:"resolved_by" db:"resolved_by"`
}
func (a Alarm) Validate() error {
+23
View File
@@ -10,6 +10,7 @@ import (
"math"
"net/http"
"strings"
"time"
"github.com/absmach/magistrala/alarms"
"github.com/absmach/supermq"
@@ -131,6 +132,26 @@ func decodeListAlarmsReq(_ context.Context, r *http.Request) (interface{}, error
if err != nil {
return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
cfrom, err := apiutil.ReadStringQuery(r, "created_from", "")
if err != nil {
return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
cto, err := apiutil.ReadStringQuery(r, "created_to", "")
if err != nil {
return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
var createdFrom, createdTo time.Time
if cfrom != "" {
if createdFrom, err = time.Parse(time.RFC3339, cfrom); err != nil {
return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
}
if cto != "" {
if createdTo, err = time.Parse(time.RFC3339, cto); err != nil {
return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
}
return listAlarmsReq{
PageMetadata: alarms.PageMetadata{
@@ -148,6 +169,8 @@ func decodeListAlarmsReq(_ context.Context, r *http.Request) (interface{}, error
UpdatedBy: updatedBy,
AcknowledgedBy: acknowledgedBy,
AssignedBy: assignedBy,
CreatedFrom: createdFrom,
CreatedTo: createdTo,
},
}, nil
}
+1 -4
View File
@@ -40,10 +40,7 @@ func (h handler) Handle(msg *messaging.Message) (err error) {
alarm.ChannelID = msg.GetChannel()
alarm.ClientID = msg.GetPublisher()
alarm.Subtopic = msg.GetSubtopic()
if alarm.CreatedAt.IsZero() {
alarm.CreatedAt = time.Unix(0, int64(msg.GetCreated()))
}
alarm.CreatedAt = time.Unix(0, int64(msg.GetCreated()))
if err := alarm.Validate(); err != nil {
return err
+8 -4
View File
@@ -41,9 +41,10 @@ func (lm *loggingMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm
slog.String("measurement", alarm.Measurement),
slog.String("value", alarm.Value),
slog.String("unit", alarm.Unit),
slog.Uint64("status", uint64(alarm.Status)),
slog.Uint64("severity", uint64(alarm.Severity)),
slog.String("threshold", alarm.Threshold),
slog.String("cause", alarm.Cause),
slog.Uint64("severity", uint64(alarm.Severity)),
),
}
if err != nil {
@@ -51,7 +52,9 @@ func (lm *loggingMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm
lm.logger.Warn("Create alarm failed", args...)
return
}
lm.logger.Info("Create alarm completed successfully", args...)
if alarm.ID != "" {
lm.logger.Info("Create alarm completed successfully", args...)
}
}(time.Now())
return lm.service.CreateAlarm(ctx, alarm)
@@ -72,10 +75,10 @@ func (lm *loggingMiddleware) UpdateAlarm(ctx context.Context, session authn.Sess
slog.String("measurement", dba.Measurement),
slog.String("value", dba.Value),
slog.String("unit", dba.Unit),
slog.String("status", dba.Status.String()),
slog.Uint64("severity", uint64(dba.Severity)),
slog.String("threshold", dba.Threshold),
slog.String("cause", dba.Cause),
slog.Uint64("severity", uint64(dba.Severity)),
slog.String("status", dba.Status.String()),
),
}
if err != nil {
@@ -119,6 +122,7 @@ func (lm *loggingMiddleware) ListAlarms(ctx context.Context, session authn.Sessi
slog.String("channel_id", pm.ChannelID),
slog.String("client_id", pm.ClientID),
slog.String("subtopic", pm.Subtopic),
slog.String("status", pm.Status.String()),
slog.Uint64("severity", uint64(pm.Severity)),
}
if err != nil {
+64 -12
View File
@@ -30,9 +30,48 @@ func NewAlarmsRepo(db *sqlx.DB) alarms.Repository {
}
func (r *repository) CreateAlarm(ctx context.Context, alarm alarms.Alarm) (alarms.Alarm, error) {
query := `INSERT INTO alarms (id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit, threshold, cause, status, severity, assignee_id, metadata, created_at)
VALUES (:id, :rule_id, :domain_id, :channel_id, :client_id, :subtopic, :measurement, :value, :unit, :threshold, :cause, :status, :severity, :assignee_id, :metadata, :created_at)
RETURNING id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit, threshold, cause, status, severity, assignee_id, metadata, created_at;`
query := `
WITH existing AS (
SELECT status, severity
FROM alarms
WHERE domain_id = :domain_id
AND rule_id = :rule_id
AND channel_id = :channel_id
AND client_id = :client_id
AND subtopic = :subtopic
AND measurement = :measurement
AND created_at <= :created_at
ORDER BY created_at DESC
LIMIT 1
)
INSERT INTO alarms (
id, rule_id, domain_id, channel_id, client_id, subtopic, measurement,
value, unit, threshold, cause, status, severity, assignee_id,
created_at, updated_at, updated_by, assigned_at, assigned_by,
acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata
)
SELECT
:id, :rule_id, :domain_id, :channel_id, :client_id, :subtopic, :measurement,
:value, :unit, :threshold, :cause, :status, :severity, :assignee_id,
:created_at, :updated_at, :updated_by, :assigned_at, :assigned_by,
:acknowledged_at, :acknowledged_by, :resolved_at, :resolved_by, :metadata
WHERE (
EXISTS (
SELECT 1 FROM existing
WHERE existing.status IS DISTINCT FROM :status
OR (:status = 0 AND existing.status = 0 AND existing.severity IS DISTINCT FROM :severity)
)
OR (
NOT EXISTS (SELECT 1 FROM existing) AND :status = 0
)
)
RETURNING
id, rule_id, domain_id, channel_id, client_id, subtopic, measurement,
value, unit, threshold, cause, status, severity, created_at,
assignee_id, updated_at, updated_by, assigned_at, assigned_by,
acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata
;
`
dba, err := toDBAlarm(alarm)
if err != nil {
return alarms.Alarm{}, errors.Wrap(repoerr.ErrCreateEntity, err)
@@ -147,7 +186,11 @@ func (r *repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (al
return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err)
}
q := fmt.Sprintf(`SELECT * FROM alarms %s ORDER BY created_at DESC LIMIT :limit OFFSET :offset;`, query)
q := fmt.Sprintf(`SELECT id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit,
threshold, cause, status, severity, assignee_id, created_at, updated_at, updated_by, assigned_at,
assigned_by, acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata
FROM alarms %s ORDER BY created_at DESC LIMIT :limit OFFSET :offset;`, query)
rows, err := r.db.NamedQueryContext(ctx, q, pm)
if err != nil {
return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err)
@@ -384,27 +427,30 @@ func pageQuery(pm alarms.PageMetadata) (string, error) {
if pm.DomainID != "" {
query = append(query, "domain_id = :domain_id")
}
if pm.RuleID != "" {
query = append(query, "rule_id = :rule_id")
}
if pm.ChannelID != "" {
query = append(query, "channel_id = :channel_id")
}
if pm.ClientID != "" {
query = append(query, "client_id = :client_id")
}
if pm.Subtopic != "" {
query = append(query, "subtopic = :subtopic")
}
if pm.RuleID != "" {
query = append(query, "rule_id = :rule_id")
if pm.ClientID != "" {
query = append(query, "client_id = :client_id")
}
if pm.Measurement != "" {
query = append(query, "measurement = :measurement")
}
if pm.Status != alarms.AllStatus {
query = append(query, "status = :status")
}
if pm.AssigneeID != "" {
query = append(query, "assignee_id = :assignee_id")
}
if pm.Severity != math.MaxUint8 {
query = append(query, "severity = :severity")
}
if pm.AssigneeID != "" {
query = append(query, "assignee_id = :assignee_id")
}
if pm.UpdatedBy != "" {
query = append(query, "updated_by = :updated_by")
}
@@ -417,6 +463,12 @@ func pageQuery(pm alarms.PageMetadata) (string, error) {
if pm.AssignedBy != "" {
query = append(query, "assigned_by = :assigned_by")
}
if !pm.CreatedFrom.IsZero() {
query = append(query, "created_at >= :created_from")
}
if !pm.CreatedTo.IsZero() {
query = append(query, "created_at <= :created_to")
}
var emq string
if len(query) > 0 {
+1 -1
View File
@@ -66,7 +66,7 @@ func TestCreateAlarm(t *testing.T) {
{
desc: "duplicate alarm",
alarm: alarm,
err: repoerr.ErrConflict,
err: repoerr.ErrNotFound,
},
{
desc: "missing rule id",
+2 -1
View File
@@ -21,8 +21,8 @@ func Migration() *migrate.MemoryMigrationSource {
rule_id VARCHAR(36) NOT NULL CHECK (length(rule_id) > 0),
domain_id VARCHAR(36) NOT NULL,
channel_id VARCHAR(36) NOT NULL,
client_id VARCHAR(36) NOT NULL,
subtopic TEXT NOT NULL,
client_id VARCHAR(36) NOT NULL,
measurement TEXT NOT NULL,
value TEXT NOT NULL,
unit TEXT NOT NULL,
@@ -42,6 +42,7 @@ func Migration() *migrate.MemoryMigrationSource {
resolved_by VARCHAR(36) NULL,
metadata JSONB
);`,
"CREATE INDEX IF NOT EXISTS idx_alarms_state ON alarms (domain_id, rule_id, channel_id, subtopic, client_id, measurement, created_at DESC);",
},
Down: []string{
`DROP TABLE IF EXISTS alarms`,
+3 -21
View File
@@ -9,6 +9,7 @@ import (
"github.com/absmach/supermq"
"github.com/absmach/supermq/pkg/authn"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
)
type service struct {
@@ -39,29 +40,10 @@ func (s *service) CreateAlarm(ctx context.Context, alarm Alarm) error {
return err
}
pm := PageMetadata{
Limit: 1,
Offset: 0,
DomainID: alarm.DomainID,
ChannelID: alarm.ChannelID,
ClientID: alarm.ClientID,
Subtopic: alarm.Subtopic,
RuleID: alarm.RuleID,
Severity: alarm.Severity,
Status: alarm.Status,
}
lastAlarms, err := s.repo.ListAlarms(ctx, pm)
if err != nil {
if _, err = s.repo.CreateAlarm(ctx, alarm); err != nil && err != repoerr.ErrNotFound {
return err
}
if len(lastAlarms.Alarms) > 0 {
return nil
}
_, err = s.repo.CreateAlarm(ctx, alarm)
return err
return nil
}
func (s *service) ViewAlarm(ctx context.Context, session authn.Session, alarmID string) (Alarm, error) {
+14 -8
View File
@@ -6,7 +6,9 @@ package alarms_test
import (
"context"
"fmt"
"math"
"testing"
"time"
"github.com/absmach/magistrala/alarms"
"github.com/absmach/magistrala/alarms/mocks"
@@ -23,7 +25,7 @@ var idp = uuid.New()
func TestCreateAlarm(t *testing.T) {
repo := new(mocks.Repository)
svc := alarms.NewService(idp, repo)
ts := time.Now()
cases := []struct {
desc string
alarm alarms.Alarm
@@ -42,6 +44,7 @@ func TestCreateAlarm(t *testing.T) {
Unit: "unit",
Cause: "cause",
Severity: 100,
CreatedAt: ts,
},
err: nil,
},
@@ -57,6 +60,7 @@ func TestCreateAlarm(t *testing.T) {
Unit: "unit",
Cause: "cause",
Severity: 100,
CreatedAt: ts,
},
err: errors.New("rule_id is required"),
},
@@ -67,13 +71,15 @@ func TestCreateAlarm(t *testing.T) {
repoCall := repo.On("CreateAlarm", context.Background(), mock.Anything).Return(tc.alarm, tc.err)
repoCall1 := repo.On("ListAlarms", context.Background(), alarms.PageMetadata{
Offset: 0, Limit: 1,
DomainID: tc.alarm.DomainID,
ChannelID: tc.alarm.ChannelID,
ClientID: tc.alarm.ClientID,
Subtopic: tc.alarm.Subtopic,
RuleID: tc.alarm.RuleID,
Severity: tc.alarm.Severity,
Status: tc.alarm.Status,
DomainID: tc.alarm.DomainID,
ChannelID: tc.alarm.ChannelID,
ClientID: tc.alarm.ClientID,
Subtopic: tc.alarm.Subtopic,
Measurement: tc.alarm.Measurement,
RuleID: tc.alarm.RuleID,
Status: alarms.AllStatus,
Severity: math.MaxUint8,
CreatedTo: tc.alarm.CreatedAt,
}).Return(alarms.AlarmsPage{}, tc.err)
err := svc.CreateAlarm(context.Background(), tc.alarm)
if tc.err != nil {
+1 -1
View File
@@ -43,7 +43,7 @@ func (s Status) String() string {
// ToStatus converts string value to a valid Alarm status.
func ToStatus(status string) (Status, error) {
switch status {
switch strings.ToLower(status) {
case Active:
return ActiveStatus, nil
case Cleared:
+1
View File
@@ -170,6 +170,7 @@ func main() {
Topic: brokers.AllTopic,
DeliveryPolicy: messaging.DeliverAllPolicy,
Handler: consumer,
AckErr: true,
}
if err := pubSub.Subscribe(ctx, subCfg); err != nil {
logger.Error(fmt.Sprintf("failed to subscribe to message broker: %s", err))
+1 -1
View File
@@ -61,7 +61,7 @@ func (re *re) sendAlarm(ctx context.Context, ruleID string, original *messaging.
m := &messaging.Message{
Domain: original.Domain,
Publisher: original.Publisher,
Created: alarm.CreatedAt.UnixNano(),
Created: original.Created,
Channel: original.Channel,
Subtopic: original.Subtopic,
Protocol: original.Protocol,