diff --git a/alarms/alarms.go b/alarms/alarms.go index b89993125..30c83baea 100644 --- a/alarms/alarms.go +++ b/alarms/alarms.go @@ -53,20 +53,23 @@ type AlarmsPage struct { } type PageMetadata struct { - Offset uint64 `json:"offset" db:"offset"` - Limit uint64 `json:"limit" db:"limit"` - DomainID string `json:"domain_id" db:"domain_id"` - ChannelID string `json:"channel_id" db:"channel_id"` - ClientID string `json:"client_id" db:"client_id"` - Subtopic string `json:"subtopic" db:"subtopic"` - RuleID string `json:"rule_id" db:"rule_id"` - Status Status `json:"status" db:"status"` - AssigneeID string `json:"assignee_id" db:"assignee_id"` - Severity uint8 `json:"severity" db:"severity"` - UpdatedBy string `json:"updated_by" db:"updated_by"` - AssignedBy string `json:"assigned_by" db:"assigned_by"` - AcknowledgedBy string `json:"acknowledged_by" db:"acknowledged_by"` - ResolvedBy string `json:"resolved_by" db:"resolved_by"` + Offset uint64 `json:"offset" db:"offset"` + Limit uint64 `json:"limit" db:"limit"` + DomainID string `json:"domain_id" db:"domain_id"` + RuleID string `json:"rule_id" db:"rule_id"` + ChannelID string `json:"channel_id" db:"channel_id"` + ClientID string `json:"client_id" db:"client_id"` + Subtopic string `json:"subtopic" db:"subtopic"` + Measurement string `json:"measurement" db:"measurement"` + Status Status `json:"status" db:"status"` + CreatedFrom time.Time `json:"created_from" db:"created_from"` + CreatedTo time.Time `json:"created_to" db:"created_to"` + AssigneeID string `json:"assignee_id" db:"assignee_id"` + Severity uint8 `json:"severity" db:"severity"` + UpdatedBy string `json:"updated_by" db:"updated_by"` + AssignedBy string `json:"assigned_by" db:"assigned_by"` + AcknowledgedBy string `json:"acknowledged_by" db:"acknowledged_by"` + ResolvedBy string `json:"resolved_by" db:"resolved_by"` } func (a Alarm) Validate() error { diff --git a/alarms/api/transport.go b/alarms/api/transport.go index 54e2d9118..68de646c4 100644 --- a/alarms/api/transport.go +++ b/alarms/api/transport.go @@ -10,6 +10,7 @@ import ( "math" "net/http" "strings" + "time" "github.com/absmach/magistrala/alarms" "github.com/absmach/supermq" @@ -131,6 +132,26 @@ func decodeListAlarmsReq(_ context.Context, r *http.Request) (interface{}, error if err != nil { return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err) } + cfrom, err := apiutil.ReadStringQuery(r, "created_from", "") + if err != nil { + return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err) + } + cto, err := apiutil.ReadStringQuery(r, "created_to", "") + if err != nil { + return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err) + } + + var createdFrom, createdTo time.Time + if cfrom != "" { + if createdFrom, err = time.Parse(time.RFC3339, cfrom); err != nil { + return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err) + } + } + if cto != "" { + if createdTo, err = time.Parse(time.RFC3339, cto); err != nil { + return listAlarmsReq{}, errors.Wrap(apiutil.ErrValidation, err) + } + } return listAlarmsReq{ PageMetadata: alarms.PageMetadata{ @@ -148,6 +169,8 @@ func decodeListAlarmsReq(_ context.Context, r *http.Request) (interface{}, error UpdatedBy: updatedBy, AcknowledgedBy: acknowledgedBy, AssignedBy: assignedBy, + CreatedFrom: createdFrom, + CreatedTo: createdTo, }, }, nil } diff --git a/alarms/consumer/consumer.go b/alarms/consumer/consumer.go index 8b51979f7..7cb14f22d 100644 --- a/alarms/consumer/consumer.go +++ b/alarms/consumer/consumer.go @@ -40,10 +40,7 @@ func (h handler) Handle(msg *messaging.Message) (err error) { alarm.ChannelID = msg.GetChannel() alarm.ClientID = msg.GetPublisher() alarm.Subtopic = msg.GetSubtopic() - - if alarm.CreatedAt.IsZero() { - alarm.CreatedAt = time.Unix(0, int64(msg.GetCreated())) - } + alarm.CreatedAt = time.Unix(0, int64(msg.GetCreated())) if err := alarm.Validate(); err != nil { return err diff --git a/alarms/middleware/logging.go b/alarms/middleware/logging.go index c8e265358..f535c304a 100644 --- a/alarms/middleware/logging.go +++ b/alarms/middleware/logging.go @@ -41,9 +41,10 @@ func (lm *loggingMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm slog.String("measurement", alarm.Measurement), slog.String("value", alarm.Value), slog.String("unit", alarm.Unit), + slog.Uint64("status", uint64(alarm.Status)), + slog.Uint64("severity", uint64(alarm.Severity)), slog.String("threshold", alarm.Threshold), slog.String("cause", alarm.Cause), - slog.Uint64("severity", uint64(alarm.Severity)), ), } if err != nil { @@ -51,7 +52,9 @@ func (lm *loggingMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm lm.logger.Warn("Create alarm failed", args...) return } - lm.logger.Info("Create alarm completed successfully", args...) + if alarm.ID != "" { + lm.logger.Info("Create alarm completed successfully", args...) + } }(time.Now()) return lm.service.CreateAlarm(ctx, alarm) @@ -72,10 +75,10 @@ func (lm *loggingMiddleware) UpdateAlarm(ctx context.Context, session authn.Sess slog.String("measurement", dba.Measurement), slog.String("value", dba.Value), slog.String("unit", dba.Unit), + slog.String("status", dba.Status.String()), + slog.Uint64("severity", uint64(dba.Severity)), slog.String("threshold", dba.Threshold), slog.String("cause", dba.Cause), - slog.Uint64("severity", uint64(dba.Severity)), - slog.String("status", dba.Status.String()), ), } if err != nil { @@ -119,6 +122,7 @@ func (lm *loggingMiddleware) ListAlarms(ctx context.Context, session authn.Sessi slog.String("channel_id", pm.ChannelID), slog.String("client_id", pm.ClientID), slog.String("subtopic", pm.Subtopic), + slog.String("status", pm.Status.String()), slog.Uint64("severity", uint64(pm.Severity)), } if err != nil { diff --git a/alarms/postgres/alarms.go b/alarms/postgres/alarms.go index ddd85e127..466001b98 100644 --- a/alarms/postgres/alarms.go +++ b/alarms/postgres/alarms.go @@ -30,9 +30,48 @@ func NewAlarmsRepo(db *sqlx.DB) alarms.Repository { } func (r *repository) CreateAlarm(ctx context.Context, alarm alarms.Alarm) (alarms.Alarm, error) { - query := `INSERT INTO alarms (id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit, threshold, cause, status, severity, assignee_id, metadata, created_at) - VALUES (:id, :rule_id, :domain_id, :channel_id, :client_id, :subtopic, :measurement, :value, :unit, :threshold, :cause, :status, :severity, :assignee_id, :metadata, :created_at) - RETURNING id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit, threshold, cause, status, severity, assignee_id, metadata, created_at;` + query := ` + WITH existing AS ( + SELECT status, severity + FROM alarms + WHERE domain_id = :domain_id + AND rule_id = :rule_id + AND channel_id = :channel_id + AND client_id = :client_id + AND subtopic = :subtopic + AND measurement = :measurement + AND created_at <= :created_at + ORDER BY created_at DESC + LIMIT 1 + ) + INSERT INTO alarms ( + id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, + value, unit, threshold, cause, status, severity, assignee_id, + created_at, updated_at, updated_by, assigned_at, assigned_by, + acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata + ) + SELECT + :id, :rule_id, :domain_id, :channel_id, :client_id, :subtopic, :measurement, + :value, :unit, :threshold, :cause, :status, :severity, :assignee_id, + :created_at, :updated_at, :updated_by, :assigned_at, :assigned_by, + :acknowledged_at, :acknowledged_by, :resolved_at, :resolved_by, :metadata + WHERE ( + EXISTS ( + SELECT 1 FROM existing + WHERE existing.status IS DISTINCT FROM :status + OR (:status = 0 AND existing.status = 0 AND existing.severity IS DISTINCT FROM :severity) + ) + OR ( + NOT EXISTS (SELECT 1 FROM existing) AND :status = 0 + ) + ) + RETURNING + id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, + value, unit, threshold, cause, status, severity, created_at, + assignee_id, updated_at, updated_by, assigned_at, assigned_by, + acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata + ; + ` dba, err := toDBAlarm(alarm) if err != nil { return alarms.Alarm{}, errors.Wrap(repoerr.ErrCreateEntity, err) @@ -147,7 +186,11 @@ func (r *repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (al return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err) } - q := fmt.Sprintf(`SELECT * FROM alarms %s ORDER BY created_at DESC LIMIT :limit OFFSET :offset;`, query) + q := fmt.Sprintf(`SELECT id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit, + threshold, cause, status, severity, assignee_id, created_at, updated_at, updated_by, assigned_at, + assigned_by, acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata + FROM alarms %s ORDER BY created_at DESC LIMIT :limit OFFSET :offset;`, query) + rows, err := r.db.NamedQueryContext(ctx, q, pm) if err != nil { return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err) @@ -384,27 +427,30 @@ func pageQuery(pm alarms.PageMetadata) (string, error) { if pm.DomainID != "" { query = append(query, "domain_id = :domain_id") } + if pm.RuleID != "" { + query = append(query, "rule_id = :rule_id") + } if pm.ChannelID != "" { query = append(query, "channel_id = :channel_id") } - if pm.ClientID != "" { - query = append(query, "client_id = :client_id") - } if pm.Subtopic != "" { query = append(query, "subtopic = :subtopic") } - if pm.RuleID != "" { - query = append(query, "rule_id = :rule_id") + if pm.ClientID != "" { + query = append(query, "client_id = :client_id") + } + if pm.Measurement != "" { + query = append(query, "measurement = :measurement") } if pm.Status != alarms.AllStatus { query = append(query, "status = :status") } - if pm.AssigneeID != "" { - query = append(query, "assignee_id = :assignee_id") - } if pm.Severity != math.MaxUint8 { query = append(query, "severity = :severity") } + if pm.AssigneeID != "" { + query = append(query, "assignee_id = :assignee_id") + } if pm.UpdatedBy != "" { query = append(query, "updated_by = :updated_by") } @@ -417,6 +463,12 @@ func pageQuery(pm alarms.PageMetadata) (string, error) { if pm.AssignedBy != "" { query = append(query, "assigned_by = :assigned_by") } + if !pm.CreatedFrom.IsZero() { + query = append(query, "created_at >= :created_from") + } + if !pm.CreatedTo.IsZero() { + query = append(query, "created_at <= :created_to") + } var emq string if len(query) > 0 { diff --git a/alarms/postgres/alarms_test.go b/alarms/postgres/alarms_test.go index b435da524..e858fb4b1 100644 --- a/alarms/postgres/alarms_test.go +++ b/alarms/postgres/alarms_test.go @@ -66,7 +66,7 @@ func TestCreateAlarm(t *testing.T) { { desc: "duplicate alarm", alarm: alarm, - err: repoerr.ErrConflict, + err: repoerr.ErrNotFound, }, { desc: "missing rule id", diff --git a/alarms/postgres/init.go b/alarms/postgres/init.go index 6ccb3ab44..e25b97b60 100644 --- a/alarms/postgres/init.go +++ b/alarms/postgres/init.go @@ -21,8 +21,8 @@ func Migration() *migrate.MemoryMigrationSource { rule_id VARCHAR(36) NOT NULL CHECK (length(rule_id) > 0), domain_id VARCHAR(36) NOT NULL, channel_id VARCHAR(36) NOT NULL, - client_id VARCHAR(36) NOT NULL, subtopic TEXT NOT NULL, + client_id VARCHAR(36) NOT NULL, measurement TEXT NOT NULL, value TEXT NOT NULL, unit TEXT NOT NULL, @@ -42,6 +42,7 @@ func Migration() *migrate.MemoryMigrationSource { resolved_by VARCHAR(36) NULL, metadata JSONB );`, + "CREATE INDEX IF NOT EXISTS idx_alarms_state ON alarms (domain_id, rule_id, channel_id, subtopic, client_id, measurement, created_at DESC);", }, Down: []string{ `DROP TABLE IF EXISTS alarms`, diff --git a/alarms/service.go b/alarms/service.go index c4bf28304..2ae4f8c80 100644 --- a/alarms/service.go +++ b/alarms/service.go @@ -9,6 +9,7 @@ import ( "github.com/absmach/supermq" "github.com/absmach/supermq/pkg/authn" + repoerr "github.com/absmach/supermq/pkg/errors/repository" ) type service struct { @@ -39,29 +40,10 @@ func (s *service) CreateAlarm(ctx context.Context, alarm Alarm) error { return err } - pm := PageMetadata{ - Limit: 1, - Offset: 0, - DomainID: alarm.DomainID, - ChannelID: alarm.ChannelID, - ClientID: alarm.ClientID, - Subtopic: alarm.Subtopic, - RuleID: alarm.RuleID, - Severity: alarm.Severity, - Status: alarm.Status, - } - lastAlarms, err := s.repo.ListAlarms(ctx, pm) - if err != nil { + if _, err = s.repo.CreateAlarm(ctx, alarm); err != nil && err != repoerr.ErrNotFound { return err } - - if len(lastAlarms.Alarms) > 0 { - return nil - } - - _, err = s.repo.CreateAlarm(ctx, alarm) - - return err + return nil } func (s *service) ViewAlarm(ctx context.Context, session authn.Session, alarmID string) (Alarm, error) { diff --git a/alarms/service_test.go b/alarms/service_test.go index 87372c134..ed721f687 100644 --- a/alarms/service_test.go +++ b/alarms/service_test.go @@ -6,7 +6,9 @@ package alarms_test import ( "context" "fmt" + "math" "testing" + "time" "github.com/absmach/magistrala/alarms" "github.com/absmach/magistrala/alarms/mocks" @@ -23,7 +25,7 @@ var idp = uuid.New() func TestCreateAlarm(t *testing.T) { repo := new(mocks.Repository) svc := alarms.NewService(idp, repo) - + ts := time.Now() cases := []struct { desc string alarm alarms.Alarm @@ -42,6 +44,7 @@ func TestCreateAlarm(t *testing.T) { Unit: "unit", Cause: "cause", Severity: 100, + CreatedAt: ts, }, err: nil, }, @@ -57,6 +60,7 @@ func TestCreateAlarm(t *testing.T) { Unit: "unit", Cause: "cause", Severity: 100, + CreatedAt: ts, }, err: errors.New("rule_id is required"), }, @@ -67,13 +71,15 @@ func TestCreateAlarm(t *testing.T) { repoCall := repo.On("CreateAlarm", context.Background(), mock.Anything).Return(tc.alarm, tc.err) repoCall1 := repo.On("ListAlarms", context.Background(), alarms.PageMetadata{ Offset: 0, Limit: 1, - DomainID: tc.alarm.DomainID, - ChannelID: tc.alarm.ChannelID, - ClientID: tc.alarm.ClientID, - Subtopic: tc.alarm.Subtopic, - RuleID: tc.alarm.RuleID, - Severity: tc.alarm.Severity, - Status: tc.alarm.Status, + DomainID: tc.alarm.DomainID, + ChannelID: tc.alarm.ChannelID, + ClientID: tc.alarm.ClientID, + Subtopic: tc.alarm.Subtopic, + Measurement: tc.alarm.Measurement, + RuleID: tc.alarm.RuleID, + Status: alarms.AllStatus, + Severity: math.MaxUint8, + CreatedTo: tc.alarm.CreatedAt, }).Return(alarms.AlarmsPage{}, tc.err) err := svc.CreateAlarm(context.Background(), tc.alarm) if tc.err != nil { diff --git a/alarms/status.go b/alarms/status.go index 90a6f7169..92e03157b 100644 --- a/alarms/status.go +++ b/alarms/status.go @@ -43,7 +43,7 @@ func (s Status) String() string { // ToStatus converts string value to a valid Alarm status. func ToStatus(status string) (Status, error) { - switch status { + switch strings.ToLower(status) { case Active: return ActiveStatus, nil case Cleared: diff --git a/cmd/alarms/main.go b/cmd/alarms/main.go index 7dfc2dad3..39fc3bdf7 100644 --- a/cmd/alarms/main.go +++ b/cmd/alarms/main.go @@ -170,6 +170,7 @@ func main() { Topic: brokers.AllTopic, DeliveryPolicy: messaging.DeliverAllPolicy, Handler: consumer, + AckErr: true, } if err := pubSub.Subscribe(ctx, subCfg); err != nil { logger.Error(fmt.Sprintf("failed to subscribe to message broker: %s", err)) diff --git a/re/bindings.go b/re/bindings.go index 9fc2a0527..305a0db9d 100644 --- a/re/bindings.go +++ b/re/bindings.go @@ -61,7 +61,7 @@ func (re *re) sendAlarm(ctx context.Context, ruleID string, original *messaging. m := &messaging.Message{ Domain: original.Domain, Publisher: original.Publisher, - Created: alarm.CreatedAt.UnixNano(), + Created: original.Created, Channel: original.Channel, Subtopic: original.Subtopic, Protocol: original.Protocol,