mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:30:25 +00:00
7fb5dd7b55
* add access control to rules engine Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update authorization method Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * revert code Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * initial implementation Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * remove domain from method Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix failing linter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix userid parameter Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update checksuperadmin method Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * revert changes Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * address comments Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> --------- Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
259 lines
8.2 KiB
Go
259 lines
8.2 KiB
Go
// Copyright (c) Abstract Machines
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package re
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
"github.com/absmach/magistrala/pkg/schedule"
|
|
"github.com/absmach/magistrala/re/outputs"
|
|
"github.com/absmach/supermq/pkg/authn"
|
|
"github.com/absmach/supermq/pkg/errors"
|
|
"github.com/absmach/supermq/pkg/messaging"
|
|
"github.com/absmach/supermq/pkg/roles"
|
|
)
|
|
|
|
const (
|
|
LuaType ScriptType = iota
|
|
GoType
|
|
)
|
|
|
|
const TimeLayout = "2006-01-02T15:04:05.999999Z"
|
|
|
|
type (
|
|
// ScriptType indicates Runtime type for the future versions
|
|
// that will support JS or Go runtimes alongside Lua.
|
|
ScriptType uint
|
|
|
|
Metadata map[string]any
|
|
Script struct {
|
|
Type ScriptType `json:"type"`
|
|
Value string `json:"value"`
|
|
}
|
|
)
|
|
|
|
var outputRegistry = map[outputs.OutputType]func() Runnable{
|
|
outputs.AlarmsType: func() Runnable { return &outputs.Alarm{} },
|
|
outputs.EmailType: func() Runnable { return &outputs.Email{} },
|
|
outputs.SaveRemotePgType: func() Runnable { return &outputs.Postgres{} },
|
|
outputs.ChannelsType: func() Runnable { return &outputs.ChannelPublisher{} },
|
|
outputs.SaveSenMLType: func() Runnable { return &outputs.SenML{} },
|
|
outputs.SlackType: func() Runnable { return &outputs.Slack{} },
|
|
}
|
|
|
|
type Rule struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
DomainID string `json:"domain"`
|
|
Metadata Metadata `json:"metadata,omitempty"`
|
|
Tags []string `json:"tags,omitempty"`
|
|
InputChannel string `json:"input_channel"`
|
|
InputTopic string `json:"input_topic"`
|
|
Logic Script `json:"logic"`
|
|
Outputs Outputs `json:"outputs,omitempty"`
|
|
Schedule schedule.Schedule `json:"schedule,omitempty"`
|
|
Status Status `json:"status"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
CreatedBy string `json:"created_by"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
UpdatedBy string `json:"updated_by"`
|
|
Roles []roles.MemberRoleActions `json:"roles,omitempty"`
|
|
}
|
|
|
|
// EventEncode converts a Rule struct to map[string]any at event producer.
|
|
func (r Rule) EventEncode() (map[string]any, error) {
|
|
m := map[string]any{
|
|
"id": r.ID,
|
|
"name": r.Name,
|
|
"created_at": r.CreatedAt.Format(TimeLayout),
|
|
"created_by": r.CreatedBy,
|
|
"schedule": r.Schedule.EventEncode(),
|
|
"status": r.Status.String(),
|
|
}
|
|
|
|
if r.Name != "" {
|
|
m["name"] = r.Name
|
|
}
|
|
|
|
if r.DomainID != "" {
|
|
m["domain"] = r.DomainID
|
|
}
|
|
|
|
if !r.UpdatedAt.IsZero() {
|
|
m["updated_at"] = r.UpdatedAt.Format(TimeLayout)
|
|
}
|
|
|
|
if r.UpdatedBy != "" {
|
|
m["updated_by"] = r.UpdatedBy
|
|
}
|
|
|
|
if len(r.Metadata) > 0 {
|
|
m["metadata"] = r.Metadata
|
|
}
|
|
|
|
if len(r.Tags) > 0 {
|
|
m["tags"] = r.Tags
|
|
}
|
|
|
|
if r.InputChannel != "" {
|
|
m["input_channel"] = r.InputChannel
|
|
}
|
|
|
|
if r.InputTopic != "" {
|
|
m["input_topic"] = r.InputTopic
|
|
}
|
|
|
|
if r.Logic.Value != "" {
|
|
m["logic"] = map[string]any{
|
|
"type": r.Logic.Type,
|
|
"value": r.Logic.Value,
|
|
}
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
type Outputs []Runnable
|
|
|
|
func (o *Outputs) UnmarshalJSON(data []byte) error {
|
|
var rawList []json.RawMessage
|
|
if err := json.Unmarshal(data, &rawList); err != nil {
|
|
return err
|
|
}
|
|
|
|
var runnables []Runnable
|
|
for _, raw := range rawList {
|
|
var meta struct {
|
|
Type outputs.OutputType `json:"type"`
|
|
}
|
|
if err := json.Unmarshal(raw, &meta); err != nil {
|
|
return err
|
|
}
|
|
|
|
factory, ok := outputRegistry[meta.Type]
|
|
if !ok {
|
|
return errors.New("unknown output type: " + meta.Type.String())
|
|
}
|
|
|
|
instance := factory()
|
|
if err := json.Unmarshal(raw, instance); err != nil {
|
|
return err
|
|
}
|
|
|
|
runnables = append(runnables, instance)
|
|
}
|
|
v := Outputs(runnables)
|
|
*o = v
|
|
return nil
|
|
}
|
|
|
|
type Runnable interface {
|
|
Run(ctx context.Context, msg *messaging.Message, val any) error
|
|
}
|
|
|
|
// PageMeta contains page metadata that helps navigation.
|
|
type PageMeta struct {
|
|
Total uint64 `json:"total" db:"total"`
|
|
Offset uint64 `json:"offset" db:"offset"`
|
|
Limit uint64 `json:"limit" db:"limit"`
|
|
Dir string `json:"dir" db:"dir"`
|
|
Order string `json:"order" db:"order"`
|
|
Name string `json:"name" db:"name"`
|
|
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
|
|
InputTopic *string `json:"input_topic,omitempty" db:"input_topic"`
|
|
Scheduled *bool `json:"scheduled,omitempty"`
|
|
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
|
|
Status Status `json:"status,omitempty" db:"status"`
|
|
Domain string `json:"domain_id,omitempty" db:"domain_id"`
|
|
Tag string `json:"tag,omitempty"`
|
|
ScheduledBefore *time.Time `json:"scheduled_before,omitempty" db:"scheduled_before"` // Filter rules scheduled before this time
|
|
ScheduledAfter *time.Time `json:"scheduled_after,omitempty" db:"scheduled_after"` // Filter rules scheduled after this time
|
|
Recurring *schedule.Recurring `json:"recurring,omitempty" db:"recurring"` // Filter by recurring type
|
|
UserID string `json:"user_id,omitempty" db:"user_id"`
|
|
}
|
|
|
|
// EventEncode converts a PageMeta struct to map[string]any.
|
|
func (pm PageMeta) EventEncode() map[string]any {
|
|
m := map[string]any{
|
|
"total": pm.Total,
|
|
"offset": pm.Offset,
|
|
"limit": pm.Limit,
|
|
"status": pm.Status.String(),
|
|
"domain_id": pm.Domain,
|
|
}
|
|
|
|
if pm.Dir != "" {
|
|
m["dir"] = pm.Dir
|
|
}
|
|
if pm.Name != "" {
|
|
m["name"] = pm.Name
|
|
}
|
|
if pm.InputChannel != "" {
|
|
m["input_channel"] = pm.InputChannel
|
|
}
|
|
if pm.InputTopic != nil {
|
|
m["input_topic"] = *pm.InputTopic
|
|
}
|
|
if pm.Scheduled != nil {
|
|
m["scheduled"] = *pm.Scheduled
|
|
}
|
|
if pm.OutputChannel != "" {
|
|
m["output_channel"] = pm.OutputChannel
|
|
}
|
|
if pm.Tag != "" {
|
|
m["tag"] = pm.Tag
|
|
}
|
|
if pm.ScheduledBefore != nil {
|
|
m["scheduled_before"] = pm.ScheduledBefore.Format(time.RFC3339Nano)
|
|
}
|
|
if pm.ScheduledAfter != nil {
|
|
m["scheduled_after"] = pm.ScheduledAfter.Format(time.RFC3339Nano)
|
|
}
|
|
if pm.Recurring != nil {
|
|
m["recurring"] = pm.Recurring.String()
|
|
}
|
|
|
|
return m
|
|
}
|
|
|
|
type Page struct {
|
|
Offset uint64 `json:"offset"`
|
|
Limit uint64 `json:"limit"`
|
|
Total uint64 `json:"total"`
|
|
Rules []Rule `json:"rules"`
|
|
}
|
|
|
|
type Service interface {
|
|
messaging.MessageHandler
|
|
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, []roles.RoleProvision, error)
|
|
ViewRule(ctx context.Context, session authn.Session, id string, withRoles bool) (Rule, error)
|
|
UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
|
|
UpdateRuleTags(ctx context.Context, session authn.Session, r Rule) (Rule, error)
|
|
UpdateRuleSchedule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
|
|
ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error)
|
|
RemoveRule(ctx context.Context, session authn.Session, id string) error
|
|
EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
|
|
DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
|
|
|
|
StartScheduler(ctx context.Context) error
|
|
roles.RoleManager
|
|
}
|
|
|
|
type Repository interface {
|
|
AddRule(ctx context.Context, r Rule) (Rule, error)
|
|
ViewRule(ctx context.Context, id string) (Rule, error)
|
|
RetrieveByIDWithRoles(ctx context.Context, id, memberID string) (Rule, error)
|
|
UpdateRule(ctx context.Context, r Rule) (Rule, error)
|
|
UpdateRuleTags(ctx context.Context, r Rule) (Rule, error)
|
|
UpdateRuleSchedule(ctx context.Context, r Rule) (Rule, error)
|
|
RemoveRule(ctx context.Context, id string) error
|
|
UpdateRuleStatus(ctx context.Context, r Rule) (Rule, error)
|
|
ListAllRules(ctx context.Context, pm PageMeta) (Page, error)
|
|
ListUserRules(ctx context.Context, userID string, pm PageMeta) (Page, error)
|
|
UpdateRuleDue(ctx context.Context, id string, due time.Time) (Rule, error)
|
|
roles.Repository
|
|
}
|