mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:10:19 +00:00
2ef8437d8b
* add access control to rules engine Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * add access control to reports Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * add access control to alarms Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix failing linter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove unused variables Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update authorization method Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * revert code Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove roles Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update alarm permissions Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update alarm permissions Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * address comments Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix tests Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * revert endpoint changes Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix make fetch Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * revert env variable Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove rule prefix Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove trailing line Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove unused constants Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * re consumer Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update listing Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix tests Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix linter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix rule roles interface Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * refactor listing commands Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fetch supermq Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * address coments Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update script Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * address comments Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fetch supermq Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix time layout Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix failing linter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix failing linter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix role name Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix failing linter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * address comments Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove white spaces Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update check usperadmin method Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update go mod file Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix tests Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * add missing env variable Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> --------- Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
194 lines
5.0 KiB
Go
194 lines
5.0 KiB
Go
// Copyright (c) Abstract Machines
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package consumer
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
|
|
"github.com/absmach/magistrala/re"
|
|
"github.com/absmach/supermq/pkg/errors"
|
|
"github.com/absmach/supermq/pkg/events"
|
|
"github.com/absmach/supermq/pkg/events/store"
|
|
rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer"
|
|
)
|
|
|
|
const (
|
|
stream = "events.supermq.rule.*"
|
|
|
|
create = "rule.create"
|
|
update = "rule.update"
|
|
updateTags = "rule.update_tags"
|
|
updateSchedule = "rule.update_schedule"
|
|
enable = "rule.enable"
|
|
disable = "rule.disable"
|
|
remove = "rule.remove"
|
|
)
|
|
|
|
var (
|
|
errNoOperationKey = errors.New("operation key is not found in event message")
|
|
errAddRuleEvent = errors.New("failed to consume rule create event")
|
|
errUpdateRuleEvent = errors.New("failed to consume rule update event")
|
|
errUpdateRuleTagsEvent = errors.New("failed to consume rule update tags event")
|
|
errUpdateRuleScheduleEvent = errors.New("failed to consume rule update schedule event")
|
|
errEnableRuleEvent = errors.New("failed to consume rule enable event")
|
|
errDisableRuleEvent = errors.New("failed to consume rule disable event")
|
|
errRemoveRuleEvent = errors.New("failed to consume rule remove event")
|
|
)
|
|
|
|
type eventHandler struct {
|
|
repo re.Repository
|
|
rolesEventHandler rconsumer.EventHandler
|
|
}
|
|
|
|
func RulesEventsSubscribe(ctx context.Context, repo re.Repository, esURL, esConsumerName string, logger *slog.Logger) error {
|
|
subscriber, err := store.NewSubscriber(ctx, esURL, logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
subConfig := events.SubscriberConfig{
|
|
Stream: stream,
|
|
Consumer: esConsumerName,
|
|
Handler: NewEventHandler(repo),
|
|
Ordered: true,
|
|
}
|
|
return subscriber.Subscribe(ctx, subConfig)
|
|
}
|
|
|
|
// NewEventHandler returns new event store handler.
|
|
func NewEventHandler(repo re.Repository) events.EventHandler {
|
|
reh := rconsumer.NewEventHandler("rule", repo)
|
|
return &eventHandler{
|
|
repo: repo,
|
|
rolesEventHandler: reh,
|
|
}
|
|
}
|
|
|
|
func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
|
|
msg, err := event.Encode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
op, ok := msg["operation"]
|
|
if !ok {
|
|
return errNoOperationKey
|
|
}
|
|
|
|
switch op {
|
|
case create:
|
|
return es.addRuleHandler(ctx, msg)
|
|
case update:
|
|
return es.updateRuleHandler(ctx, msg)
|
|
case updateTags:
|
|
return es.updateRuleTagsHandler(ctx, msg)
|
|
case updateSchedule:
|
|
return es.updateRuleScheduleHandler(ctx, msg)
|
|
case enable:
|
|
return es.enableRuleHandler(ctx, msg)
|
|
case disable:
|
|
return es.disableRuleHandler(ctx, msg)
|
|
case remove:
|
|
return es.removeRuleHandler(ctx, msg)
|
|
}
|
|
|
|
return es.rolesEventHandler.Handle(ctx, op, msg)
|
|
}
|
|
|
|
func (es *eventHandler) addRuleHandler(ctx context.Context, data map[string]any) error {
|
|
r, rps, err := decodeAddRuleEvent(data)
|
|
if err != nil {
|
|
return errors.Wrap(errAddRuleEvent, err)
|
|
}
|
|
|
|
if _, err := es.repo.AddRule(ctx, r); err != nil {
|
|
return errors.Wrap(errAddRuleEvent, err)
|
|
}
|
|
|
|
if _, err := es.repo.AddRoles(ctx, rps); err != nil {
|
|
return errors.Wrap(errAddRuleEvent, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es *eventHandler) updateRuleHandler(ctx context.Context, data map[string]any) error {
|
|
r, err := decodeUpdateRuleEvent(data)
|
|
if err != nil {
|
|
return errors.Wrap(errUpdateRuleEvent, err)
|
|
}
|
|
|
|
if _, err := es.repo.UpdateRule(ctx, r); err != nil {
|
|
return errors.Wrap(errUpdateRuleEvent, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es *eventHandler) updateRuleTagsHandler(ctx context.Context, data map[string]any) error {
|
|
r, err := decodeUpdateRuleTagsEvent(data)
|
|
if err != nil {
|
|
return errors.Wrap(errUpdateRuleTagsEvent, err)
|
|
}
|
|
|
|
if _, err := es.repo.UpdateRuleTags(ctx, r); err != nil {
|
|
return errors.Wrap(errUpdateRuleTagsEvent, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es *eventHandler) updateRuleScheduleHandler(ctx context.Context, data map[string]any) error {
|
|
r, err := decodeUpdateRuleScheduleEvent(data)
|
|
if err != nil {
|
|
return errors.Wrap(errUpdateRuleScheduleEvent, err)
|
|
}
|
|
|
|
if _, err := es.repo.UpdateRuleSchedule(ctx, r); err != nil {
|
|
return errors.Wrap(errUpdateRuleScheduleEvent, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es *eventHandler) enableRuleHandler(ctx context.Context, data map[string]any) error {
|
|
r, err := decodeEnableRuleEvent(data)
|
|
if err != nil {
|
|
return errors.Wrap(errEnableRuleEvent, err)
|
|
}
|
|
|
|
if _, err := es.repo.UpdateRuleStatus(ctx, r); err != nil {
|
|
return errors.Wrap(errEnableRuleEvent, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es *eventHandler) disableRuleHandler(ctx context.Context, data map[string]any) error {
|
|
r, err := decodeDisableRuleEvent(data)
|
|
if err != nil {
|
|
return errors.Wrap(errDisableRuleEvent, err)
|
|
}
|
|
|
|
if _, err := es.repo.UpdateRuleStatus(ctx, r); err != nil {
|
|
return errors.Wrap(errDisableRuleEvent, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (es *eventHandler) removeRuleHandler(ctx context.Context, data map[string]any) error {
|
|
id, err := decodeRemoveRuleEvent(data)
|
|
if err != nil {
|
|
return errors.Wrap(errRemoveRuleEvent, err)
|
|
}
|
|
|
|
if err := es.repo.RemoveRule(ctx, id); err != nil {
|
|
return errors.Wrap(errRemoveRuleEvent, err)
|
|
}
|
|
|
|
return nil
|
|
}
|