mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:50:18 +00:00
c2a03e6128
* initial implementation Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update tests Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update postgres writer Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix readers tests Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * address comments Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix senML naming Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove logger from service Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix failing linter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * terminate message when alarm fails to decode Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * revert changes Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove message from logging Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove empty line Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * wrap error Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> --------- Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
57 lines
1.3 KiB
Go
57 lines
1.3 KiB
Go
// Copyright (c) Abstract Machines
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package consumer
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/gob"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/absmach/magistrala/alarms"
|
|
"github.com/absmach/supermq/pkg/errors"
|
|
"github.com/absmach/supermq/pkg/messaging"
|
|
)
|
|
|
|
var errFailedToDecode = errors.New("failed to decode alarm")
|
|
|
|
type handler struct {
|
|
svc alarms.Service
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func NewHandler(svc alarms.Service, logger *slog.Logger) messaging.MessageHandler {
|
|
return &handler{svc: svc, logger: logger}
|
|
}
|
|
|
|
func (h handler) Handle(msg *messaging.Message) (err error) {
|
|
if msg == nil {
|
|
return errors.New("message is empty")
|
|
}
|
|
if msg.GetPayload() == nil {
|
|
return errors.New("message payload is empty")
|
|
}
|
|
|
|
var alarm alarms.Alarm
|
|
if err := gob.NewDecoder(bytes.NewReader(msg.GetPayload())).Decode(&alarm); err != nil {
|
|
return messaging.NewError(errors.Wrap(errFailedToDecode, err), messaging.Term)
|
|
}
|
|
alarm.DomainID = msg.GetDomain()
|
|
alarm.ChannelID = msg.GetChannel()
|
|
alarm.ClientID = msg.GetPublisher()
|
|
alarm.Subtopic = msg.GetSubtopic()
|
|
alarm.CreatedAt = time.Unix(0, int64(msg.GetCreated()))
|
|
|
|
if err := alarm.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return h.svc.CreateAlarm(context.Background(), alarm)
|
|
}
|
|
|
|
func (h handler) Cancel() error {
|
|
return nil
|
|
}
|