Files
Steve Munene c2a03e6128 MG-167 - Terminate Alarms message which failed to handle (#191)
* initial implementation

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update postgres writer

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix readers tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix senML naming

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove logger from service

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix failing linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* terminate message when alarm fails to decode

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* revert changes

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove message from logging

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove empty line

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* wrap error

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

---------

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
2025-06-16 16:20:01 +02:00

57 lines
1.3 KiB
Go

// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"bytes"
"context"
"encoding/gob"
"log/slog"
"time"
"github.com/absmach/magistrala/alarms"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
)
var errFailedToDecode = errors.New("failed to decode alarm")
type handler struct {
svc alarms.Service
logger *slog.Logger
}
func NewHandler(svc alarms.Service, logger *slog.Logger) messaging.MessageHandler {
return &handler{svc: svc, logger: logger}
}
func (h handler) Handle(msg *messaging.Message) (err error) {
if msg == nil {
return errors.New("message is empty")
}
if msg.GetPayload() == nil {
return errors.New("message payload is empty")
}
var alarm alarms.Alarm
if err := gob.NewDecoder(bytes.NewReader(msg.GetPayload())).Decode(&alarm); err != nil {
return messaging.NewError(errors.Wrap(errFailedToDecode, err), messaging.Term)
}
alarm.DomainID = msg.GetDomain()
alarm.ChannelID = msg.GetChannel()
alarm.ClientID = msg.GetPublisher()
alarm.Subtopic = msg.GetSubtopic()
alarm.CreatedAt = time.Unix(0, int64(msg.GetCreated()))
if err := alarm.Validate(); err != nil {
return err
}
return h.svc.CreateAlarm(context.Background(), alarm)
}
func (h handler) Cancel() error {
return nil
}