mirror of
https://github.com/absmach/magistrala.git
synced 2026-06-23 04:10:28 +00:00
61d0427898
Signed-off-by: dusan <borovcanindusan1@gmail.com>
158 lines
3.9 KiB
Go
158 lines
3.9 KiB
Go
// Copyright (c) Abstract Machines
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package consumers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"strings"
|
|
|
|
apiutil "github.com/absmach/magistrala/api/http/util"
|
|
"github.com/absmach/magistrala/pkg/errors"
|
|
"github.com/absmach/magistrala/pkg/messaging"
|
|
"github.com/absmach/magistrala/pkg/transformers"
|
|
"github.com/absmach/magistrala/pkg/transformers/json"
|
|
"github.com/absmach/magistrala/pkg/transformers/senml"
|
|
"github.com/pelletier/go-toml"
|
|
)
|
|
|
|
const (
|
|
defContentType = "application/senml+json"
|
|
defFormat = "senml"
|
|
)
|
|
|
|
var (
|
|
errOpenConfFile = errors.New("unable to open configuration file")
|
|
errParseConfFile = errors.New("unable to parse configuration file")
|
|
)
|
|
|
|
// Start method starts consuming messages received from Message broker.
|
|
// This method transforms messages to SenML format before
|
|
// using MessageRepository to store them.
|
|
func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer any, configPath string, defaultTopic string, logger *slog.Logger) error {
|
|
cfg, err := loadConfig(configPath, defaultTopic)
|
|
if err != nil {
|
|
logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err))
|
|
}
|
|
|
|
transformer := makeTransformer(cfg.TransformerCfg, logger)
|
|
|
|
for _, topic := range cfg.SubscriberCfg.Topics {
|
|
subCfg := messaging.SubscriberConfig{
|
|
ID: id,
|
|
Topic: topic,
|
|
DeliveryPolicy: messaging.DeliverAllPolicy,
|
|
}
|
|
switch c := consumer.(type) {
|
|
case AsyncConsumer:
|
|
subCfg.Handler = handleAsync(ctx, transformer, c)
|
|
if err := sub.Subscribe(ctx, subCfg); err != nil {
|
|
return err
|
|
}
|
|
case BlockingConsumer:
|
|
subCfg.Handler = handleSync(ctx, transformer, c)
|
|
if err := sub.Subscribe(ctx, subCfg); err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
return apiutil.ErrInvalidQueryParams
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleSync(ctx context.Context, t transformers.Transformer, sc BlockingConsumer) handleFunc {
|
|
return func(msg *messaging.Message) error {
|
|
m := any(msg)
|
|
var err error
|
|
if t != nil {
|
|
m, err = t.Transform(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return sc.ConsumeBlocking(ctx, m)
|
|
}
|
|
}
|
|
|
|
func handleAsync(ctx context.Context, t transformers.Transformer, ac AsyncConsumer) handleFunc {
|
|
return func(msg *messaging.Message) error {
|
|
m := any(msg)
|
|
var err error
|
|
if t != nil {
|
|
m, err = t.Transform(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ac.ConsumeAsync(ctx, m)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type handleFunc func(msg *messaging.Message) error
|
|
|
|
func (h handleFunc) Handle(msg *messaging.Message) error {
|
|
return h(msg)
|
|
}
|
|
|
|
func (h handleFunc) Cancel() error {
|
|
return nil
|
|
}
|
|
|
|
type subscriberConfig struct {
|
|
Topics []string `toml:"topics"`
|
|
}
|
|
|
|
type transformerConfig struct {
|
|
Format string `toml:"format"`
|
|
ContentType string `toml:"content_type"`
|
|
TimeFields []json.TimeField `toml:"time_fields"`
|
|
}
|
|
|
|
type config struct {
|
|
SubscriberCfg subscriberConfig `toml:"subscriber"`
|
|
TransformerCfg transformerConfig `toml:"transformer"`
|
|
}
|
|
|
|
func loadConfig(configPath, defaultTopic string) (config, error) {
|
|
cfg := config{
|
|
SubscriberCfg: subscriberConfig{
|
|
Topics: []string{defaultTopic},
|
|
},
|
|
TransformerCfg: transformerConfig{
|
|
Format: defFormat,
|
|
ContentType: defContentType,
|
|
},
|
|
}
|
|
|
|
data, err := os.ReadFile(configPath)
|
|
if err != nil {
|
|
return cfg, errors.Wrap(errOpenConfFile, err)
|
|
}
|
|
|
|
if err := toml.Unmarshal(data, &cfg); err != nil {
|
|
return cfg, errors.Wrap(errParseConfFile, err)
|
|
}
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func makeTransformer(cfg transformerConfig, logger *slog.Logger) transformers.Transformer {
|
|
switch strings.ToUpper(cfg.Format) {
|
|
case "SENML":
|
|
logger.Info("Using SenML transformer")
|
|
return senml.New(cfg.ContentType)
|
|
case "JSON":
|
|
logger.Info("Using JSON transformer")
|
|
return json.New(cfg.TimeFields)
|
|
default:
|
|
logger.Warn(fmt.Sprintf("No transformer created: unknown transformer type %s", cfg.Format))
|
|
return nil
|
|
}
|
|
}
|