mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:40:19 +00:00
NOISSUE - Add callout in Rule Engine Service (#277)
* add callout to re Signed-off-by: Arvindh <arvindh91@gmail.com> * add callout to re Signed-off-by: Arvindh <arvindh91@gmail.com> * add callout to re Signed-off-by: Arvindh <arvindh91@gmail.com> * add rule events Signed-off-by: Arvindh <arvindh91@gmail.com> * add rule events Signed-off-by: Arvindh <arvindh91@gmail.com> * add rule events Signed-off-by: Arvindh <arvindh91@gmail.com> * remove lints Signed-off-by: Arvindh <arvindh91@gmail.com> * remove lints Signed-off-by: Arvindh <arvindh91@gmail.com> * remove decoders Signed-off-by: Arvindh <arvindh91@gmail.com> * remove lints Signed-off-by: Arvindh <arvindh91@gmail.com> * remove lints Signed-off-by: Arvindh <arvindh91@gmail.com> * replace interface{} with any Signed-off-by: Arvindh <arvindh91@gmail.com> * optimization of event Signed-off-by: Arvindh <arvindh91@gmail.com> * remove lints Signed-off-by: Arvindh <arvindh91@gmail.com> * align code Signed-off-by: Arvindh <arvindh91@gmail.com> --------- Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
+25
-3
@@ -23,6 +23,7 @@ import (
|
||||
"github.com/absmach/magistrala/pkg/ticker"
|
||||
"github.com/absmach/magistrala/re"
|
||||
httpapi "github.com/absmach/magistrala/re/api"
|
||||
"github.com/absmach/magistrala/re/events"
|
||||
"github.com/absmach/magistrala/re/middleware"
|
||||
repg "github.com/absmach/magistrala/re/postgres"
|
||||
grpcClient "github.com/absmach/magistrala/readers/api/grpc"
|
||||
@@ -31,6 +32,7 @@ import (
|
||||
authnsvc "github.com/absmach/supermq/pkg/authn/authsvc"
|
||||
mgauthz "github.com/absmach/supermq/pkg/authz"
|
||||
authzsvc "github.com/absmach/supermq/pkg/authz/authsvc"
|
||||
"github.com/absmach/supermq/pkg/callout"
|
||||
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
|
||||
"github.com/absmach/supermq/pkg/grpcclient"
|
||||
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
|
||||
@@ -50,6 +52,7 @@ const (
|
||||
svcName = "rules_engine"
|
||||
envPrefixDB = "MG_RE_DB_"
|
||||
envPrefixHTTP = "MG_RE_HTTP_"
|
||||
envPrefixCallout = "MG_RE_CALLOUT_"
|
||||
envPrefixAuth = "SMQ_AUTH_GRPC_"
|
||||
defDB = "r"
|
||||
defSvcHTTPPort = "9008"
|
||||
@@ -108,6 +111,13 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
callCfg := callout.Config{}
|
||||
if err := env.ParseWithOptions(&callCfg, env.Options{Prefix: envPrefixCallout}); err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to parse callout config : %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
|
||||
dbConfig := pgclient.Config{Name: defDB}
|
||||
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
|
||||
logger.Error(err.Error())
|
||||
@@ -146,6 +156,13 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
callout, err := callout.New(callCfg)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create new callout: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
|
||||
msgSub, err := smqbrokers.NewPubSub(ctx, cfg.BrokerURL, logger)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to connect to message broker for mg pubSub: %s", err))
|
||||
@@ -235,7 +252,7 @@ func main() {
|
||||
readersClient := grpcClient.NewReadersClient(client.Connection(), regrpcCfg.Timeout)
|
||||
logger.Info("Readers gRPC client successfully connected to readers gRPC server " + client.Secure())
|
||||
|
||||
svc, err := newService(database, runInfo, msgSub, writersPub, alarmsPub, authz, ec, logger, readersClient)
|
||||
svc, err := newService(ctx, database, runInfo, msgSub, writersPub, alarmsPub, authz, ec, logger, readersClient, callout, cfg)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create services: %s", err))
|
||||
exitCode = 1
|
||||
@@ -287,7 +304,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(db pgclient.Database, runInfo chan pkglog.RunInfo, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, authz mgauthz.Authorization, ec email.Config, logger *slog.Logger, readersClient grpcReadersV1.ReadersServiceClient) (re.Service, error) {
|
||||
func newService(ctx context.Context, db pgclient.Database, runInfo chan pkglog.RunInfo, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, authz mgauthz.Authorization, ec email.Config, logger *slog.Logger, readersClient grpcReadersV1.ReadersServiceClient, callout callout.Callout, cfg config) (re.Service, error) {
|
||||
repo := repg.NewRepository(db)
|
||||
idp := uuid.New()
|
||||
|
||||
@@ -297,7 +314,12 @@ func newService(db pgclient.Database, runInfo chan pkglog.RunInfo, rePubSub mess
|
||||
}
|
||||
|
||||
csvc := re.NewService(repo, runInfo, idp, rePubSub, writersPub, alarmsPub, ticker.NewTicker(time.Second*30), emailerClient, readersClient)
|
||||
csvc, err = middleware.AuthorizationMiddleware(csvc, authz)
|
||||
csvc, err = events.NewEventStoreMiddleware(ctx, csvc, cfg.ESURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to init re event store middleware: %w", err)
|
||||
}
|
||||
|
||||
csvc, err = middleware.AuthorizationMiddleware(csvc, authz, callout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
+10
@@ -111,6 +111,16 @@ MG_RE_DB_SSL_ROOT_CERT=
|
||||
MG_RE_INSTANCE_ID=
|
||||
MG_RE_EMAIL_TEMPLATE=re.tmpl
|
||||
|
||||
#### RE Callout
|
||||
MG_RE_CALLOUT_URLS=""
|
||||
MG_RE_CALLOUT_METHOD="POST"
|
||||
MG_RE_CALLOUT_TLS_VERIFICATION="false"
|
||||
MG_RE_CALLOUT_TIMEOUT="10s"
|
||||
MG_RE_CALLOUT_CA_CERT=""
|
||||
MG_RE_CALLOUT_CERT=""
|
||||
MG_RE_CALLOUT_KEY=""
|
||||
MG_RE_CALLOUT_OPERATIONS=""
|
||||
|
||||
MG_EMAIL_HOST=smtp.mailtrap.io
|
||||
MG_EMAIL_PORT=2525
|
||||
MG_EMAIL_USERNAME=18bf7f70705139
|
||||
|
||||
@@ -217,7 +217,16 @@ services:
|
||||
MG_RE_DB_SSL_CERT: ${MG_RE_DB_SSL_CERT}
|
||||
MG_RE_DB_SSL_KEY: ${MG_RE_DB_SSL_KEY}
|
||||
MG_RE_DB_SSL_ROOT_CERT: ${MG_RE_DB_SSL_ROOT_CERT}
|
||||
MG_RE_CALLOUT_URLS: ${MG_RE_CALLOUT_URLS}
|
||||
MG_RE_CALLOUT_METHOD: ${MG_RE_CALLOUT_METHOD}
|
||||
MG_RE_CALLOUT_TLS_VERIFICATION: ${MG_RE_CALLOUT_TLS_VERIFICATION}
|
||||
MG_RE_CALLOUT_TIMEOUT: ${MG_RE_CALLOUT_TIMEOUT}
|
||||
MG_RE_CALLOUT_CA_CERT: ${MG_RE_CALLOUT_CA_CERT}
|
||||
MG_RE_CALLOUT_CERT: ${MG_RE_CALLOUT_CERT}
|
||||
MG_RE_CALLOUT_KEY: ${MG_RE_CALLOUT_KEY}
|
||||
MG_RE_CALLOUT_OPERATIONS: ${MG_RE_CALLOUT_OPERATIONS}
|
||||
SMQ_MESSAGE_BROKER_URL: ${SMQ_MESSAGE_BROKER_URL}
|
||||
SMQ_ES_URL: ${SMQ_ES_URL}
|
||||
SMQ_JAEGER_URL: ${SMQ_JAEGER_URL}
|
||||
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
|
||||
SMQ_SEND_TELEMETRY: ${SMQ_SEND_TELEMETRY}
|
||||
|
||||
+33
-10
@@ -10,6 +10,14 @@ import (
|
||||
"github.com/absmach/supermq/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
noneType = "none"
|
||||
hourlyType = "hourly"
|
||||
dailyType = "daily"
|
||||
weeklyType = "weekly"
|
||||
monthlyType = "monthly"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidRecurringType = errors.New("invalid recurring type")
|
||||
ErrStartDateTimeInPast = errors.New("start_datetime must be greater than or equal to current time")
|
||||
@@ -29,15 +37,15 @@ const (
|
||||
func (rt Recurring) String() string {
|
||||
switch rt {
|
||||
case Hourly:
|
||||
return "hourly"
|
||||
return hourlyType
|
||||
case Daily:
|
||||
return "daily"
|
||||
return dailyType
|
||||
case Weekly:
|
||||
return "weekly"
|
||||
return weeklyType
|
||||
case Monthly:
|
||||
return "monthly"
|
||||
return monthlyType
|
||||
default:
|
||||
return "none"
|
||||
return noneType
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,15 +60,15 @@ func (rt *Recurring) UnmarshalJSON(data []byte) error {
|
||||
}
|
||||
|
||||
switch s {
|
||||
case "hourly":
|
||||
case hourlyType:
|
||||
*rt = Hourly
|
||||
case "daily":
|
||||
case dailyType:
|
||||
*rt = Daily
|
||||
case "weekly":
|
||||
case weeklyType:
|
||||
*rt = Weekly
|
||||
case "monthly":
|
||||
case monthlyType:
|
||||
*rt = Monthly
|
||||
case "none":
|
||||
case noneType:
|
||||
*rt = None
|
||||
default:
|
||||
return ErrInvalidRecurringType
|
||||
@@ -147,3 +155,18 @@ func (s Schedule) NextDue() time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
}
|
||||
|
||||
// EventEncode converts a schedule.Schedule struct to map[string]any.
|
||||
func (s Schedule) EventEncode() map[string]any {
|
||||
m := map[string]any{
|
||||
"recurring": s.Recurring.String(),
|
||||
"recurring_period": s.RecurringPeriod,
|
||||
}
|
||||
if !s.StartDateTime.IsZero() {
|
||||
m["start_datetime"] = s.StartDateTime.Format(time.RFC3339)
|
||||
}
|
||||
if !s.Time.IsZero() {
|
||||
m["time"] = s.Time.Format(time.RFC3339)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package events provides the domain concept definitions needed to support
|
||||
// clients events functionality.
|
||||
package events
|
||||
@@ -0,0 +1,189 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"maps"
|
||||
|
||||
"github.com/absmach/magistrala/re"
|
||||
"github.com/absmach/supermq/pkg/authn"
|
||||
"github.com/absmach/supermq/pkg/events"
|
||||
)
|
||||
|
||||
const (
|
||||
rulePrefix = "rule."
|
||||
ruleCreate = rulePrefix + "create"
|
||||
ruleList = rulePrefix + "list"
|
||||
ruleView = rulePrefix + "view"
|
||||
ruleUpdate = rulePrefix + "update"
|
||||
ruleUpdateTags = rulePrefix + "update_tags"
|
||||
ruleUpdateSchedule = rulePrefix + "update_schedule"
|
||||
ruleEnable = rulePrefix + "enable"
|
||||
ruleDisable = rulePrefix + "disable"
|
||||
ruleRemove = rulePrefix + "remove"
|
||||
)
|
||||
|
||||
var (
|
||||
_ events.Event = (*createRuleEvent)(nil)
|
||||
_ events.Event = (*listRuleEvent)(nil)
|
||||
_ events.Event = (*viewRuleEvent)(nil)
|
||||
_ events.Event = (*updateRuleEvent)(nil)
|
||||
_ events.Event = (*updateRuleTagsEvent)(nil)
|
||||
_ events.Event = (*updateRuleScheduleEvent)(nil)
|
||||
_ events.Event = (*enableRuleEvent)(nil)
|
||||
_ events.Event = (*disableRuleEvent)(nil)
|
||||
_ events.Event = (*removeRuleEvent)(nil)
|
||||
)
|
||||
|
||||
type baseRuleEvent struct {
|
||||
session authn.Session
|
||||
requestID string
|
||||
}
|
||||
|
||||
func newBaseRuleEvent(session authn.Session, requestID string) baseRuleEvent {
|
||||
return baseRuleEvent{
|
||||
session: session,
|
||||
requestID: requestID,
|
||||
}
|
||||
}
|
||||
|
||||
func (bre baseRuleEvent) Encode() map[string]any {
|
||||
return map[string]any{
|
||||
"domain": bre.session.DomainID,
|
||||
"user_id": bre.session.UserID,
|
||||
"token_type": bre.session.Type.String(),
|
||||
"super_admin": bre.session.SuperAdmin,
|
||||
"request_id": bre.requestID,
|
||||
}
|
||||
}
|
||||
|
||||
type createRuleEvent struct {
|
||||
rule re.Rule
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
func (cre createRuleEvent) Encode() (map[string]any, error) {
|
||||
val, err := cre.rule.EventEncode()
|
||||
if err != nil {
|
||||
return map[string]any{}, err
|
||||
}
|
||||
maps.Copy(val, cre.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleCreate
|
||||
return val, nil
|
||||
}
|
||||
|
||||
type listRuleEvent struct {
|
||||
re.PageMeta
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
// Encode implements the events.Event interface for listRuleEvent.
|
||||
func (lre listRuleEvent) Encode() (map[string]any, error) {
|
||||
val := lre.PageMeta.EventEncode()
|
||||
maps.Copy(val, lre.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleList
|
||||
return val, nil
|
||||
}
|
||||
|
||||
type updateRuleEvent struct {
|
||||
rule re.Rule
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
type viewRuleEvent struct {
|
||||
rule re.Rule
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
func (vre viewRuleEvent) Encode() (map[string]any, error) {
|
||||
val, err := vre.rule.EventEncode()
|
||||
if err != nil {
|
||||
return map[string]any{}, err
|
||||
}
|
||||
maps.Copy(val, vre.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleView
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func (ure updateRuleEvent) Encode() (map[string]any, error) {
|
||||
val, err := ure.rule.EventEncode()
|
||||
if err != nil {
|
||||
return map[string]any{}, err
|
||||
}
|
||||
maps.Copy(val, ure.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleUpdate
|
||||
return val, nil
|
||||
}
|
||||
|
||||
type updateRuleTagsEvent struct {
|
||||
rule re.Rule
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
func (urte updateRuleTagsEvent) Encode() (map[string]any, error) {
|
||||
val, err := urte.rule.EventEncode()
|
||||
if err != nil {
|
||||
return map[string]any{}, err
|
||||
}
|
||||
maps.Copy(val, urte.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleUpdateTags
|
||||
return val, nil
|
||||
}
|
||||
|
||||
type updateRuleScheduleEvent struct {
|
||||
rule re.Rule
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
func (urse updateRuleScheduleEvent) Encode() (map[string]any, error) {
|
||||
val, err := urse.rule.EventEncode()
|
||||
if err != nil {
|
||||
return map[string]any{}, err
|
||||
}
|
||||
maps.Copy(val, urse.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleUpdateSchedule
|
||||
return val, nil
|
||||
}
|
||||
|
||||
type disableRuleEvent struct {
|
||||
rule re.Rule
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
func (dre disableRuleEvent) Encode() (map[string]any, error) {
|
||||
val, err := dre.rule.EventEncode()
|
||||
if err != nil {
|
||||
return map[string]any{}, err
|
||||
}
|
||||
maps.Copy(val, dre.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleDisable
|
||||
return val, nil
|
||||
}
|
||||
|
||||
type enableRuleEvent struct {
|
||||
rule re.Rule
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
func (ere enableRuleEvent) Encode() (map[string]any, error) {
|
||||
val, err := ere.rule.EventEncode()
|
||||
if err != nil {
|
||||
return map[string]any{}, err
|
||||
}
|
||||
maps.Copy(val, ere.baseRuleEvent.Encode())
|
||||
val["operation"] = ruleEnable
|
||||
return val, nil
|
||||
}
|
||||
|
||||
type removeRuleEvent struct {
|
||||
id string
|
||||
baseRuleEvent
|
||||
}
|
||||
|
||||
func (rre removeRuleEvent) Encode() (map[string]any, error) {
|
||||
val := rre.baseRuleEvent.Encode()
|
||||
val["id"] = rre.id
|
||||
val["operation"] = ruleRemove
|
||||
return val, nil
|
||||
}
|
||||
@@ -0,0 +1,196 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/absmach/magistrala/re"
|
||||
"github.com/absmach/supermq/pkg/authn"
|
||||
"github.com/absmach/supermq/pkg/events"
|
||||
"github.com/absmach/supermq/pkg/events/store"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
)
|
||||
|
||||
const (
|
||||
supermqPrefix = "supermq."
|
||||
CreateStream = supermqPrefix + ruleCreate
|
||||
ListStream = supermqPrefix + ruleList
|
||||
ViewStream = supermqPrefix + ruleView
|
||||
UpdateStream = supermqPrefix + ruleUpdate
|
||||
UpdateTagsStream = supermqPrefix + ruleUpdateTags
|
||||
UpdateScheduleStream = supermqPrefix + ruleUpdateSchedule
|
||||
EnableStream = supermqPrefix + ruleEnable
|
||||
DisableStream = supermqPrefix + ruleDisable
|
||||
RemoveStream = supermqPrefix + ruleRemove
|
||||
)
|
||||
|
||||
var _ re.Service = (*eventStore)(nil)
|
||||
|
||||
type eventStore struct {
|
||||
events.Publisher
|
||||
svc re.Service
|
||||
}
|
||||
|
||||
// NewEventStoreMiddleware returns wrapper around rules service that sends
|
||||
// events to event store.
|
||||
func NewEventStoreMiddleware(ctx context.Context, svc re.Service, url string) (re.Service, error) {
|
||||
publisher, err := store.NewPublisher(ctx, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &eventStore{
|
||||
svc: svc,
|
||||
Publisher: publisher,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
|
||||
rule, err := es.svc.AddRule(ctx, session, r)
|
||||
if err != nil {
|
||||
return rule, err
|
||||
}
|
||||
event := createRuleEvent{
|
||||
rule: rule,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, CreateStream, event); err != nil {
|
||||
return rule, err
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) ListRules(ctx context.Context, session authn.Session, pm re.PageMeta) (re.Page, error) {
|
||||
page, err := es.svc.ListRules(ctx, session, pm)
|
||||
if err != nil {
|
||||
return page, err
|
||||
}
|
||||
event := listRuleEvent{
|
||||
PageMeta: pm,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, ListStream, event); err != nil {
|
||||
return page, err
|
||||
}
|
||||
return page, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) ViewRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) {
|
||||
rule, err := es.svc.ViewRule(ctx, session, id)
|
||||
if err != nil {
|
||||
return rule, err
|
||||
}
|
||||
event := viewRuleEvent{
|
||||
rule: rule,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, ViewStream, event); err != nil {
|
||||
return rule, err
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) UpdateRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
|
||||
rule, err := es.svc.UpdateRule(ctx, session, r)
|
||||
if err != nil {
|
||||
return rule, err
|
||||
}
|
||||
event := updateRuleEvent{
|
||||
rule: rule,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, UpdateStream, event); err != nil {
|
||||
return rule, err
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) UpdateRuleTags(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
|
||||
rule, err := es.svc.UpdateRuleTags(ctx, session, r)
|
||||
if err != nil {
|
||||
return rule, err
|
||||
}
|
||||
event := updateRuleTagsEvent{
|
||||
rule: rule,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, UpdateTagsStream, event); err != nil {
|
||||
return rule, err
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) UpdateRuleSchedule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
|
||||
rule, err := es.svc.UpdateRuleSchedule(ctx, session, r)
|
||||
if err != nil {
|
||||
return rule, err
|
||||
}
|
||||
event := updateRuleScheduleEvent{
|
||||
rule: rule,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, UpdateScheduleStream, event); err != nil {
|
||||
return rule, err
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) RemoveRule(ctx context.Context, session authn.Session, id string) error {
|
||||
err := es.svc.RemoveRule(ctx, session, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
event := removeRuleEvent{
|
||||
id: id,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, RemoveStream, event); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es *eventStore) EnableRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) {
|
||||
rule, err := es.svc.EnableRule(ctx, session, id)
|
||||
if err != nil {
|
||||
return rule, err
|
||||
}
|
||||
event := enableRuleEvent{
|
||||
rule: rule,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, EnableStream, event); err != nil {
|
||||
return rule, err
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) DisableRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) {
|
||||
rule, err := es.svc.DisableRule(ctx, session, id)
|
||||
if err != nil {
|
||||
return rule, err
|
||||
}
|
||||
event := disableRuleEvent{
|
||||
rule: rule,
|
||||
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
|
||||
}
|
||||
if err := es.Publish(ctx, DisableStream, event); err != nil {
|
||||
return rule, err
|
||||
}
|
||||
return rule, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) StartScheduler(ctx context.Context) error {
|
||||
return es.svc.StartScheduler(ctx)
|
||||
}
|
||||
|
||||
func (es *eventStore) Handle(msg *messaging.Message) error {
|
||||
return es.svc.Handle(msg)
|
||||
}
|
||||
|
||||
func (es *eventStore) Cancel() error {
|
||||
return es.svc.Cancel()
|
||||
}
|
||||
@@ -5,10 +5,13 @@ package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"maps"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/re"
|
||||
"github.com/absmach/supermq/pkg/authn"
|
||||
smqauthz "github.com/absmach/supermq/pkg/authz"
|
||||
"github.com/absmach/supermq/pkg/callout"
|
||||
"github.com/absmach/supermq/pkg/errors"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
"github.com/absmach/supermq/pkg/policies"
|
||||
@@ -21,16 +24,20 @@ var (
|
||||
errDomainDeleteRules = errors.New("not authorized to delete rules in domain")
|
||||
)
|
||||
|
||||
const entityType = "rule"
|
||||
|
||||
type authorizationMiddleware struct {
|
||||
svc re.Service
|
||||
authz smqauthz.Authorization
|
||||
svc re.Service
|
||||
authz smqauthz.Authorization
|
||||
callout callout.Callout
|
||||
}
|
||||
|
||||
// AuthorizationMiddleware adds authorization to the re service.
|
||||
func AuthorizationMiddleware(svc re.Service, authz smqauthz.Authorization) (re.Service, error) {
|
||||
func AuthorizationMiddleware(svc re.Service, authz smqauthz.Authorization, callout callout.Callout) (re.Service, error) {
|
||||
return &authorizationMiddleware{
|
||||
svc: svc,
|
||||
authz: authz,
|
||||
svc: svc,
|
||||
authz: authz,
|
||||
callout: callout,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -47,6 +54,15 @@ func (am *authorizationMiddleware) AddRule(ctx context.Context, session authn.Se
|
||||
return re.Rule{}, errors.Wrap(errDomainCreateRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entities": r,
|
||||
"count": 1,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpAddRule, params); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
return am.svc.AddRule(ctx, session, r)
|
||||
}
|
||||
|
||||
@@ -63,6 +79,14 @@ func (am *authorizationMiddleware) ViewRule(ctx context.Context, session authn.S
|
||||
return re.Rule{}, errors.Wrap(errDomainViewRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entity_id": id,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpViewRule, params); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
return am.svc.ViewRule(ctx, session, id)
|
||||
}
|
||||
|
||||
@@ -79,6 +103,14 @@ func (am *authorizationMiddleware) UpdateRule(ctx context.Context, session authn
|
||||
return re.Rule{}, errors.Wrap(errDomainUpdateRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entity_id": r.ID,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpUpdateRule, params); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
return am.svc.UpdateRule(ctx, session, r)
|
||||
}
|
||||
|
||||
@@ -95,6 +127,14 @@ func (am *authorizationMiddleware) UpdateRuleTags(ctx context.Context, session a
|
||||
return re.Rule{}, errors.Wrap(errDomainUpdateRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entity_id": r.ID,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpUpdateRuleTags, params); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
return am.svc.UpdateRuleTags(ctx, session, r)
|
||||
}
|
||||
|
||||
@@ -111,6 +151,14 @@ func (am *authorizationMiddleware) UpdateRuleSchedule(ctx context.Context, sessi
|
||||
return re.Rule{}, errors.Wrap(errDomainUpdateRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entity_id": r.ID,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpUpdateRuleSchedule, params); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
return am.svc.UpdateRuleSchedule(ctx, session, r)
|
||||
}
|
||||
|
||||
@@ -127,6 +175,14 @@ func (am *authorizationMiddleware) ListRules(ctx context.Context, session authn.
|
||||
return re.Page{}, errors.Wrap(errDomainViewRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"pagemeta": pm,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpListRules, params); err != nil {
|
||||
return re.Page{}, err
|
||||
}
|
||||
|
||||
return am.svc.ListRules(ctx, session, pm)
|
||||
}
|
||||
|
||||
@@ -143,6 +199,14 @@ func (am *authorizationMiddleware) RemoveRule(ctx context.Context, session authn
|
||||
return errors.Wrap(errDomainDeleteRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entity_id": id,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpRemoveRule, params); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return am.svc.RemoveRule(ctx, session, id)
|
||||
}
|
||||
|
||||
@@ -159,6 +223,14 @@ func (am *authorizationMiddleware) EnableRule(ctx context.Context, session authn
|
||||
return re.Rule{}, errors.Wrap(errDomainUpdateRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entity_id": id,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpEnableRule, params); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
return am.svc.EnableRule(ctx, session, id)
|
||||
}
|
||||
|
||||
@@ -175,6 +247,14 @@ func (am *authorizationMiddleware) DisableRule(ctx context.Context, session auth
|
||||
return re.Rule{}, errors.Wrap(errDomainUpdateRules, err)
|
||||
}
|
||||
|
||||
params := map[string]any{
|
||||
"entity_id": id,
|
||||
}
|
||||
|
||||
if err := am.callOut(ctx, session, re.OpDisableRule, params); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
return am.svc.DisableRule(ctx, session, id)
|
||||
}
|
||||
|
||||
@@ -196,3 +276,21 @@ func (am *authorizationMiddleware) authorize(ctx context.Context, pr smqauthz.Po
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *authorizationMiddleware) callOut(ctx context.Context, session authn.Session, op string, params map[string]any) error {
|
||||
pl := map[string]any{
|
||||
"entity_type": entityType,
|
||||
"subject_type": policies.UserType,
|
||||
"subject_id": session.UserID,
|
||||
"domain": session.DomainID,
|
||||
"time": time.Now().UTC(),
|
||||
}
|
||||
|
||||
maps.Copy(params, pl)
|
||||
|
||||
if err := am.callout.Callout(ctx, op, params); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
+111
-2
@@ -20,12 +20,24 @@ const (
|
||||
GoType
|
||||
)
|
||||
|
||||
const (
|
||||
OpAddRule = "OpAddRule"
|
||||
OpViewRule = "OpViewRule"
|
||||
OpUpdateRule = "OpUpdateRule"
|
||||
OpUpdateRuleTags = "OpUpdateRuleTags"
|
||||
OpUpdateRuleSchedule = "OpUpdateRuleSchedule"
|
||||
OpListRules = "OpListRules"
|
||||
OpRemoveRule = "OpRemoveRule"
|
||||
OpEnableRule = "OpEnableRule"
|
||||
OpDisableRule = "OpDisableRule"
|
||||
)
|
||||
|
||||
type (
|
||||
// ScriptType indicates Runtime type for the future versions
|
||||
// that will support JS or Go runtimes alongside Lua.
|
||||
ScriptType uint
|
||||
|
||||
Metadata map[string]interface{}
|
||||
Metadata map[string]any
|
||||
Script struct {
|
||||
Type ScriptType `json:"type"`
|
||||
Value string `json:"value"`
|
||||
@@ -58,6 +70,59 @@ type Rule struct {
|
||||
UpdatedBy string `json:"updated_by"`
|
||||
}
|
||||
|
||||
// EventEncode converts a Rule struct to map[string]any at event producer.
|
||||
func (r Rule) EventEncode() (map[string]any, error) {
|
||||
m := map[string]any{
|
||||
"id": r.ID,
|
||||
"name": r.Name,
|
||||
"created_at": r.CreatedAt.Format(time.RFC3339Nano),
|
||||
"created_by": r.CreatedBy,
|
||||
"schedule": r.Schedule.EventEncode(),
|
||||
"status": r.Status.String(),
|
||||
}
|
||||
|
||||
if r.Name != "" {
|
||||
m["name"] = r.Name
|
||||
}
|
||||
|
||||
if r.DomainID != "" {
|
||||
m["domain"] = r.DomainID
|
||||
}
|
||||
|
||||
if !r.UpdatedAt.IsZero() {
|
||||
m["updated_at"] = r.UpdatedAt.Format(time.RFC3339Nano)
|
||||
}
|
||||
|
||||
if r.UpdatedBy != "" {
|
||||
m["updated_by"] = r.UpdatedBy
|
||||
}
|
||||
|
||||
if len(r.Metadata) > 0 {
|
||||
m["metadata"] = r.Metadata
|
||||
}
|
||||
|
||||
if len(r.Tags) > 0 {
|
||||
m["tags"] = r.Tags
|
||||
}
|
||||
|
||||
if r.InputChannel != "" {
|
||||
m["input_channel"] = r.InputChannel
|
||||
}
|
||||
|
||||
if r.InputTopic != "" {
|
||||
m["input_topic"] = r.InputTopic
|
||||
}
|
||||
|
||||
if r.Logic.Value != "" {
|
||||
m["logic"] = map[string]any{
|
||||
"type": r.Logic.Type,
|
||||
"value": r.Logic.Value,
|
||||
}
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
type Outputs []Runnable
|
||||
|
||||
func (o *Outputs) UnmarshalJSON(data []byte) error {
|
||||
@@ -93,7 +158,7 @@ func (o *Outputs) UnmarshalJSON(data []byte) error {
|
||||
}
|
||||
|
||||
type Runnable interface {
|
||||
Run(ctx context.Context, msg *messaging.Message, val interface{}) error
|
||||
Run(ctx context.Context, msg *messaging.Message, val any) error
|
||||
}
|
||||
|
||||
// PageMeta contains page metadata that helps navigation.
|
||||
@@ -116,6 +181,50 @@ type PageMeta struct {
|
||||
Recurring *schedule.Recurring `json:"recurring,omitempty" db:"recurring"` // Filter by recurring type
|
||||
}
|
||||
|
||||
// EventEncode converts a PageMeta struct to map[string]any.
|
||||
func (pm PageMeta) EventEncode() map[string]any {
|
||||
m := map[string]any{
|
||||
"total": pm.Total,
|
||||
"offset": pm.Offset,
|
||||
"limit": pm.Limit,
|
||||
"status": pm.Status.String(),
|
||||
"domain_id": pm.Domain,
|
||||
}
|
||||
|
||||
if pm.Dir != "" {
|
||||
m["dir"] = pm.Dir
|
||||
}
|
||||
if pm.Name != "" {
|
||||
m["name"] = pm.Name
|
||||
}
|
||||
if pm.InputChannel != "" {
|
||||
m["input_channel"] = pm.InputChannel
|
||||
}
|
||||
if pm.InputTopic != nil {
|
||||
m["input_topic"] = *pm.InputTopic
|
||||
}
|
||||
if pm.Scheduled != nil {
|
||||
m["scheduled"] = *pm.Scheduled
|
||||
}
|
||||
if pm.OutputChannel != "" {
|
||||
m["output_channel"] = pm.OutputChannel
|
||||
}
|
||||
if pm.Tag != "" {
|
||||
m["tag"] = pm.Tag
|
||||
}
|
||||
if pm.ScheduledBefore != nil {
|
||||
m["scheduled_before"] = pm.ScheduledBefore.Format(time.RFC3339Nano)
|
||||
}
|
||||
if pm.ScheduledAfter != nil {
|
||||
m["scheduled_after"] = pm.ScheduledAfter.Format(time.RFC3339Nano)
|
||||
}
|
||||
if pm.Recurring != nil {
|
||||
m["recurring"] = pm.Recurring.String()
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
type Page struct {
|
||||
Offset uint64 `json:"offset"`
|
||||
Limit uint64 `json:"limit"`
|
||||
|
||||
Reference in New Issue
Block a user