MG-370 - Add fine grained access control to alarms (#404)
Continuous Delivery / Build and Push (push) Has been cancelled
Check License Header / check-license (push) Has been cancelled
Deploy GitHub Pages / swagger-ui (push) Has been cancelled

* add access control to rules engine

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* add access control to reports

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* add access control to alarms

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix failing linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove unused variables

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update authorization method

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* revert code

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove roles

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update alarm permissions

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update alarm permissions

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* revert endpoint changes

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix make fetch

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* revert env variable

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove rule prefix

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove trailing line

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove unused constants

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* re consumer

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update listing

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix rule roles interface

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* refactor listing commands

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fetch supermq

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address coments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update script

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fetch supermq

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix time layout

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix failing linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix failing linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix role name

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix failing linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* remove white spaces

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update check usperadmin method

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update go mod file

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* add missing env variable

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

---------

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
This commit is contained in:
Steve Munene
2026-03-13 16:29:32 +03:00
committed by GitHub
parent 6dbcfcae58
commit 2ef8437d8b
43 changed files with 1071 additions and 292 deletions
+3 -1
View File
@@ -72,6 +72,7 @@ type PageMetadata struct {
AssignedBy string `json:"assigned_by" db:"assigned_by"`
AcknowledgedBy string `json:"acknowledged_by" db:"acknowledged_by"`
ResolvedBy string `json:"resolved_by" db:"resolved_by"`
UserID string `json:"user_id" db:"user_id"`
}
func (a Alarm) Validate() error {
@@ -116,6 +117,7 @@ type Repository interface {
CreateAlarm(ctx context.Context, alarm Alarm) (Alarm, error)
UpdateAlarm(ctx context.Context, alarm Alarm) (Alarm, error)
ViewAlarm(ctx context.Context, alarmID, domainID string) (Alarm, error)
ListAlarms(ctx context.Context, pm PageMetadata) (AlarmsPage, error)
ListAllAlarms(ctx context.Context, pm PageMetadata) (AlarmsPage, error)
ListUserAlarms(ctx context.Context, userID string, pm PageMetadata) (AlarmsPage, error)
DeleteAlarm(ctx context.Context, id string) error
}
+1 -1
View File
@@ -16,7 +16,7 @@ import (
func updateAlarmEndpoint(svc alarms.Service) endpoint.Endpoint {
return func(ctx context.Context, request any) (any, error) {
req := request.(alarmReq)
req := request.(updateAlarmReq)
if err := req.validate(); err != nil {
return alarmRes{}, errors.Wrap(apiutil.ErrValidation, err)
}
+15
View File
@@ -23,6 +23,21 @@ func (req alarmReq) validate() error {
return nil
}
type updateAlarmReq struct {
alarms.Alarm `json:",inline"`
}
func (req updateAlarmReq) validate() error {
if req.Alarm.ID == "" {
return errors.New("missing alarm id")
}
if req.Alarm.AssigneeID == "" && req.Alarm.AcknowledgedBy == "" && req.Alarm.ResolvedBy == "" {
return errors.New("at least one of assignee_id, acknowledged_by, or resolved_by must be set")
}
return nil
}
type listAlarmsReq struct {
alarms.PageMetadata
}
+3 -3
View File
@@ -195,12 +195,12 @@ func decodeAlarmReq(_ context.Context, r *http.Request) (any, error) {
func decodeUpdateAlarmReq(_ context.Context, r *http.Request) (any, error) {
if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) {
return alarmReq{}, apiutil.ErrUnsupportedContentType
return updateAlarmReq{}, apiutil.ErrUnsupportedContentType
}
req := alarmReq{}
req := updateAlarmReq{}
if err := json.NewDecoder(r.Body).Decode(&req.Alarm); err != nil {
return alarmReq{}, errors.Wrap(apiutil.ErrMalformedRequestBody, err)
return updateAlarmReq{}, errors.Wrap(apiutil.ErrMalformedRequestBody, err)
}
req.Alarm.ID = chi.URLParam(r, "alarmID")
+64 -27
View File
@@ -7,10 +7,12 @@ import (
"context"
"github.com/absmach/magistrala/alarms"
"github.com/absmach/magistrala/alarms/operations"
"github.com/absmach/supermq/auth"
"github.com/absmach/supermq/pkg/authn"
smqauthz "github.com/absmach/supermq/pkg/authz"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/permissions"
"github.com/absmach/supermq/pkg/policies"
)
@@ -22,37 +24,40 @@ var (
)
type authorizationMiddleware struct {
svc alarms.Service
authz smqauthz.Authorization
svc alarms.Service
authz smqauthz.Authorization
entitiesOps permissions.EntitiesOperations[permissions.Operation]
}
var _ alarms.Service = (*authorizationMiddleware)(nil)
func NewAuthorizationMiddleware(svc alarms.Service, authz smqauthz.Authorization) alarms.Service {
return &authorizationMiddleware{
svc: svc,
authz: authz,
func NewAuthorizationMiddleware(svc alarms.Service, authz smqauthz.Authorization, entitiesOps permissions.EntitiesOperations[permissions.Operation]) (alarms.Service, error) {
if err := entitiesOps.Validate(); err != nil {
return nil, err
}
return &authorizationMiddleware{
svc: svc,
authz: authz,
entitiesOps: entitiesOps,
}, nil
}
func (am *authorizationMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm) (err error) {
func (am *authorizationMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm) error {
return am.svc.CreateAlarm(ctx, alarm)
}
func (am *authorizationMiddleware) UpdateAlarm(ctx context.Context, session authn.Session, alarm alarms.Alarm) (dba alarms.Alarm, err error) {
// If assignee is present, check if assignee is member of domain
if err := am.authorize(ctx, alarms.OpUpdateAlarm, session); err != nil {
return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err)
}
func (am *authorizationMiddleware) UpdateAlarm(ctx context.Context, session authn.Session, alarm alarms.Alarm) (alarms.Alarm, error) {
if alarm.AssigneeID != "" {
domainUserId := auth.EncodeDomainUserID(session.DomainID, alarm.AssigneeID)
if err := am.authorize(ctx, operations.OpAssignAlarm, session, policies.DomainType, session.DomainID); err != nil {
return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err)
}
domainUserID := auth.EncodeDomainUserID(session.DomainID, alarm.AssigneeID)
if err := am.authz.Authorize(ctx, smqauthz.PolicyReq{
Domain: session.DomainID,
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: domainUserId,
Subject: domainUserID,
Permission: policies.MembershipPermission,
ObjectType: policies.DomainType,
Object: session.DomainID,
@@ -61,11 +66,23 @@ func (am *authorizationMiddleware) UpdateAlarm(ctx context.Context, session auth
}
}
if alarm.AcknowledgedBy != "" {
if err := am.authorize(ctx, operations.OpAcknowledgeAlarm, session, policies.DomainType, session.DomainID); err != nil {
return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err)
}
}
if alarm.ResolvedBy != "" {
if err := am.authorize(ctx, operations.OpResolveAlarm, session, policies.DomainType, session.DomainID); err != nil {
return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err)
}
}
return am.svc.UpdateAlarm(ctx, session, alarm)
}
func (am *authorizationMiddleware) DeleteAlarm(ctx context.Context, session authn.Session, id string) error {
if err := am.authorize(ctx, alarms.OpDeleteAlarm, session); err != nil {
if err := am.authorize(ctx, operations.OpDeleteAlarm, session, policies.DomainType, session.DomainID); err != nil {
return errors.Wrap(errDomainDeleteAlarms, err)
}
@@ -77,23 +94,27 @@ func (am *authorizationMiddleware) ListAlarms(ctx context.Context, session authn
pm.DomainID = session.DomainID
}
if err := am.authorize(ctx, alarms.OpListAlarms, session); err != nil {
return alarms.AlarmsPage{}, errors.Wrap(errDomainViewAlarms, err)
switch err := am.checkSuperAdmin(ctx, session); {
case err == nil:
session.SuperAdmin = true
case errors.Contains(err, svcerr.ErrSuperAdminAction):
default:
return alarms.AlarmsPage{}, err
}
return am.svc.ListAlarms(ctx, session, pm)
}
func (am *authorizationMiddleware) ViewAlarm(ctx context.Context, session authn.Session, id string) (alarms.Alarm, error) {
if err := am.authorize(ctx, alarms.OpViewAlarm, session); err != nil {
if err := am.authorize(ctx, operations.OpViewAlarm, session, policies.DomainType, session.DomainID); err != nil {
return alarms.Alarm{}, errors.Wrap(errDomainViewAlarms, err)
}
return am.svc.ViewAlarm(ctx, session, id)
}
func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions.Operation, session authn.Session) error {
perm, err := alarms.GetPermission(op)
func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions.Operation, session authn.Session, objType, obj string) error {
perm, err := am.entitiesOps.GetPermission(operations.EntityType, op)
if err != nil {
return err
}
@@ -103,19 +124,19 @@ func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: session.DomainUserID,
Object: session.DomainID,
ObjectType: policies.DomainType,
Permission: perm,
Object: obj,
ObjectType: objType,
Permission: perm.String(),
}
var pat *smqauthz.PATReq
if session.PatID != "" {
opName := alarms.OperationName(op)
opName := am.entitiesOps.OperationName(operations.EntityType, op)
pat = &smqauthz.PATReq{
UserID: session.UserID,
PatID: session.PatID,
EntityID: session.DomainID,
EntityType: alarms.EntityType,
EntityType: operations.EntityType,
Operation: opName,
Domain: session.DomainID,
}
@@ -127,3 +148,19 @@ func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions
return nil
}
func (am *authorizationMiddleware) checkSuperAdmin(ctx context.Context, session authn.Session) error {
if session.Role != authn.SuperAdminRole {
return svcerr.ErrSuperAdminAction
}
if err := am.authz.Authorize(ctx, smqauthz.PolicyReq{
SubjectType: policies.UserType,
Subject: session.UserID,
Permission: policies.AdminPermission,
ObjectType: policies.PlatformType,
Object: policies.SuperMQObject,
}, nil); err != nil {
return err
}
return nil
}
+83 -11
View File
@@ -165,12 +165,12 @@ func (_c *Repository_DeleteAlarm_Call) RunAndReturn(run func(ctx context.Context
return _c
}
// ListAlarms provides a mock function for the type Repository
func (_mock *Repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) {
// ListAllAlarms provides a mock function for the type Repository
func (_mock *Repository) ListAllAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) {
ret := _mock.Called(ctx, pm)
if len(ret) == 0 {
panic("no return value specified for ListAlarms")
panic("no return value specified for ListAllAlarms")
}
var r0 alarms.AlarmsPage
@@ -191,19 +191,19 @@ func (_mock *Repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata)
return r0, r1
}
// Repository_ListAlarms_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListAlarms'
type Repository_ListAlarms_Call struct {
// Repository_ListAllAlarms_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListAllAlarms'
type Repository_ListAllAlarms_Call struct {
*mock.Call
}
// ListAlarms is a helper method to define mock.On call
// ListAllAlarms is a helper method to define mock.On call
// - ctx context.Context
// - pm alarms.PageMetadata
func (_e *Repository_Expecter) ListAlarms(ctx interface{}, pm interface{}) *Repository_ListAlarms_Call {
return &Repository_ListAlarms_Call{Call: _e.mock.On("ListAlarms", ctx, pm)}
func (_e *Repository_Expecter) ListAllAlarms(ctx interface{}, pm interface{}) *Repository_ListAllAlarms_Call {
return &Repository_ListAllAlarms_Call{Call: _e.mock.On("ListAllAlarms", ctx, pm)}
}
func (_c *Repository_ListAlarms_Call) Run(run func(ctx context.Context, pm alarms.PageMetadata)) *Repository_ListAlarms_Call {
func (_c *Repository_ListAllAlarms_Call) Run(run func(ctx context.Context, pm alarms.PageMetadata)) *Repository_ListAllAlarms_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 context.Context
if args[0] != nil {
@@ -221,12 +221,84 @@ func (_c *Repository_ListAlarms_Call) Run(run func(ctx context.Context, pm alarm
return _c
}
func (_c *Repository_ListAlarms_Call) Return(alarmsPage alarms.AlarmsPage, err error) *Repository_ListAlarms_Call {
func (_c *Repository_ListAllAlarms_Call) Return(alarmsPage alarms.AlarmsPage, err error) *Repository_ListAllAlarms_Call {
_c.Call.Return(alarmsPage, err)
return _c
}
func (_c *Repository_ListAlarms_Call) RunAndReturn(run func(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error)) *Repository_ListAlarms_Call {
func (_c *Repository_ListAllAlarms_Call) RunAndReturn(run func(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error)) *Repository_ListAllAlarms_Call {
_c.Call.Return(run)
return _c
}
// ListUserAlarms provides a mock function for the type Repository
func (_mock *Repository) ListUserAlarms(ctx context.Context, userID string, pm alarms.PageMetadata) (alarms.AlarmsPage, error) {
ret := _mock.Called(ctx, userID, pm)
if len(ret) == 0 {
panic("no return value specified for ListUserAlarms")
}
var r0 alarms.AlarmsPage
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, string, alarms.PageMetadata) (alarms.AlarmsPage, error)); ok {
return returnFunc(ctx, userID, pm)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, string, alarms.PageMetadata) alarms.AlarmsPage); ok {
r0 = returnFunc(ctx, userID, pm)
} else {
r0 = ret.Get(0).(alarms.AlarmsPage)
}
if returnFunc, ok := ret.Get(1).(func(context.Context, string, alarms.PageMetadata) error); ok {
r1 = returnFunc(ctx, userID, pm)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Repository_ListUserAlarms_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListUserAlarms'
type Repository_ListUserAlarms_Call struct {
*mock.Call
}
// ListUserAlarms is a helper method to define mock.On call
// - ctx context.Context
// - userID string
// - pm alarms.PageMetadata
func (_e *Repository_Expecter) ListUserAlarms(ctx interface{}, userID interface{}, pm interface{}) *Repository_ListUserAlarms_Call {
return &Repository_ListUserAlarms_Call{Call: _e.mock.On("ListUserAlarms", ctx, userID, pm)}
}
func (_c *Repository_ListUserAlarms_Call) Run(run func(ctx context.Context, userID string, pm alarms.PageMetadata)) *Repository_ListUserAlarms_Call {
_c.Call.Run(func(args mock.Arguments) {
var arg0 context.Context
if args[0] != nil {
arg0 = args[0].(context.Context)
}
var arg1 string
if args[1] != nil {
arg1 = args[1].(string)
}
var arg2 alarms.PageMetadata
if args[2] != nil {
arg2 = args[2].(alarms.PageMetadata)
}
run(
arg0,
arg1,
arg2,
)
})
return _c
}
func (_c *Repository_ListUserAlarms_Call) Return(alarmsPage alarms.AlarmsPage, err error) *Repository_ListUserAlarms_Call {
_c.Call.Return(alarmsPage, err)
return _c
}
func (_c *Repository_ListUserAlarms_Call) RunAndReturn(run func(ctx context.Context, userID string, pm alarms.PageMetadata) (alarms.AlarmsPage, error)) *Repository_ListUserAlarms_Call {
_c.Call.Return(run)
return _c
}
-69
View File
@@ -1,69 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package alarms
import (
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/permissions"
"github.com/absmach/supermq/pkg/policies"
)
const EntityType = "alarms"
const (
OpAddAlarm = iota
OpViewAlarm
OpListAlarms
OpUpdateAlarm
OpDeleteAlarm
OpAssignAlarm
OpAcknowledgeAlarm
OpResolveAlarm
)
const (
OpAddAlarmStr = "OpAddAlarm"
OpViewAlarmStr = "OpViewAlarm"
OpListAlarmsStr = "OpListAlarms"
OpUpdateAlarmStr = "OpUpdateAlarm"
OpDeleteAlarmStr = "OpDeleteAlarm"
OpAssignAlarmStr = "OpAssignAlarm"
OpAcknowledgeAlarmStr = "OpAcknowledgeAlarm"
OpResolveAlarmStr = "OpResolveAlarm"
)
func GetPermission(op permissions.Operation) (string, error) {
if op < OpAddAlarm || op > OpResolveAlarm {
return "", errors.New("invalid operation")
}
if op == OpUpdateAlarm || op == OpDeleteAlarm {
return policies.AdminPermission, nil
}
return policies.MembershipPermission, nil
}
func OperationName(op permissions.Operation) string {
switch op {
case OpAddAlarm:
return OpAddAlarmStr
case OpViewAlarm:
return OpViewAlarmStr
case OpListAlarms:
return OpListAlarmsStr
case OpUpdateAlarm:
return OpUpdateAlarmStr
case OpDeleteAlarm:
return OpDeleteAlarmStr
case OpAssignAlarm:
return OpAssignAlarmStr
case OpAcknowledgeAlarm:
return OpAcknowledgeAlarmStr
case OpResolveAlarm:
return OpResolveAlarmStr
default:
return "unknown"
}
}
+47
View File
@@ -0,0 +1,47 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package operations
import "github.com/absmach/supermq/pkg/permissions"
const EntityType = "alarm"
// Alarm Operations.
const (
OpViewAlarm permissions.Operation = iota
OpDeleteAlarm
OpListAlarms
OpAssignAlarm
OpAcknowledgeAlarm
OpResolveAlarm
)
func OperationDetails() map[permissions.Operation]permissions.OperationDetails {
return map[permissions.Operation]permissions.OperationDetails{
OpViewAlarm: {
Name: "view",
PermissionRequired: true,
},
OpDeleteAlarm: {
Name: "delete",
PermissionRequired: true,
},
OpListAlarms: {
Name: "list",
PermissionRequired: true,
},
OpAssignAlarm: {
Name: "assign",
PermissionRequired: true,
},
OpAcknowledgeAlarm: {
Name: "acknowledge",
PermissionRequired: true,
},
OpResolveAlarm: {
Name: "resolve",
PermissionRequired: true,
},
}
}
+46 -26
View File
@@ -20,6 +20,10 @@ import (
"github.com/jmoiron/sqlx"
)
const alarmColumns = `alarms.id, alarms.rule_id, alarms.domain_id, alarms.channel_id, alarms.client_id, alarms.subtopic, alarms.measurement, alarms.value, alarms.unit,
alarms.threshold, alarms.cause, alarms.status, alarms.severity, alarms.assignee_id, alarms.created_at, alarms.updated_at, alarms.updated_by, alarms.assigned_at,
alarms.assigned_by, alarms.acknowledged_at, alarms.acknowledged_by, alarms.resolved_at, alarms.resolved_by, alarms.metadata`
type repository struct {
db *sqlx.DB
}
@@ -183,32 +187,49 @@ func (r *repository) ViewAlarm(ctx context.Context, alarmID, domainID string) (a
return alarm, nil
}
func (r *repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) {
func (r *repository) ListAllAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) {
query, err := pageQuery(pm)
if err != nil {
return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err)
}
comQuery := fmt.Sprintf(`SELECT %s FROM alarms %s`, alarmColumns, query)
return r.alarmsPage(ctx, comQuery, pm)
}
func (r *repository) ListUserAlarms(ctx context.Context, userID string, pm alarms.PageMetadata) (alarms.AlarmsPage, error) {
query, err := pageQuery(pm)
if err != nil {
return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err)
}
pm.UserID = userID
comQuery := fmt.Sprintf(`SELECT DISTINCT %s
FROM alarms
INNER JOIN rules_roles rr ON rr.entity_id = alarms.rule_id
INNER JOIN rules_role_members rrm ON rrm.role_id = rr.id AND rrm.member_id = :user_id
%s`, alarmColumns, query)
return r.alarmsPage(ctx, comQuery, pm)
}
func (r *repository) alarmsPage(ctx context.Context, comQuery string, pm alarms.PageMetadata) (alarms.AlarmsPage, error) {
dir := api.DescDir
if pm.Dir == api.AscDir {
dir = api.AscDir
}
orderClause := ""
var orderClause string
switch pm.Order {
case api.CreatedAtOrder:
orderClause = fmt.Sprintf("ORDER BY created_at %s, id %s", dir, dir)
case api.UpdatedAtOrder:
orderClause = fmt.Sprintf("ORDER BY COALESCE(updated_at, created_at) %s, id %s", dir, dir)
default:
orderClause = fmt.Sprintf("ORDER BY COALESCE(updated_at, created_at) %s, id %s", dir, dir)
}
q := fmt.Sprintf(`SELECT id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit,
threshold, cause, status, severity, assignee_id, created_at, updated_at, updated_by, assigned_at,
assigned_by, acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata
FROM alarms %s %s LIMIT :limit OFFSET :offset;`, query, orderClause)
q := fmt.Sprintf(`SELECT * FROM (%s) AS sub_query %s LIMIT :limit OFFSET :offset;`, comQuery, orderClause)
cq := fmt.Sprintf(`SELECT COUNT(*) AS total_count FROM (%s) AS sub_query;`, comQuery)
rows, err := r.db.NamedQueryContext(ctx, q, pm)
if err != nil {
@@ -231,8 +252,7 @@ func (r *repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (al
items = append(items, a)
}
q = fmt.Sprintf(`SELECT COUNT(*) FROM alarms %s;`, query)
total, err := postgres.Total(ctx, r.db, q, pm)
total, err := postgres.Total(ctx, r.db, cq, pm)
if err != nil {
return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err)
}
@@ -444,49 +464,49 @@ func toAlarm(dbr dbAlarm) (alarms.Alarm, error) {
func pageQuery(pm alarms.PageMetadata) (string, error) {
var query []string
if pm.DomainID != "" {
query = append(query, "domain_id = :domain_id")
query = append(query, "alarms.domain_id = :domain_id")
}
if pm.RuleID != "" {
query = append(query, "rule_id = :rule_id")
query = append(query, "alarms.rule_id = :rule_id")
}
if pm.ChannelID != "" {
query = append(query, "channel_id = :channel_id")
query = append(query, "alarms.channel_id = :channel_id")
}
if pm.Subtopic != "" {
query = append(query, "subtopic = :subtopic")
query = append(query, "alarms.subtopic = :subtopic")
}
if pm.ClientID != "" {
query = append(query, "client_id = :client_id")
query = append(query, "alarms.client_id = :client_id")
}
if pm.Measurement != "" {
query = append(query, "measurement = :measurement")
query = append(query, "alarms.measurement = :measurement")
}
if pm.Status != alarms.AllStatus {
query = append(query, "status = :status")
query = append(query, "alarms.status = :status")
}
if pm.Severity != math.MaxUint8 {
query = append(query, "severity = :severity")
query = append(query, "alarms.severity = :severity")
}
if pm.AssigneeID != "" {
query = append(query, "assignee_id = :assignee_id")
query = append(query, "alarms.assignee_id = :assignee_id")
}
if pm.UpdatedBy != "" {
query = append(query, "updated_by = :updated_by")
query = append(query, "alarms.updated_by = :updated_by")
}
if pm.ResolvedBy != "" {
query = append(query, "resolved_by = :resolved_by")
query = append(query, "alarms.resolved_by = :resolved_by")
}
if pm.AcknowledgedBy != "" {
query = append(query, "acknowledged_by = :acknowledged_by")
query = append(query, "alarms.acknowledged_by = :acknowledged_by")
}
if pm.AssignedBy != "" {
query = append(query, "assigned_by = :assigned_by")
query = append(query, "alarms.assigned_by = :assigned_by")
}
if !pm.CreatedFrom.IsZero() {
query = append(query, "created_at >= :created_from")
query = append(query, "alarms.created_at >= :created_from")
}
if !pm.CreatedTo.IsZero() {
query = append(query, "created_at <= :created_to")
query = append(query, "alarms.created_at <= :created_to")
}
var emq string
+179 -1
View File
@@ -403,7 +403,7 @@ func TestListAlarms(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
alarms, err := repo.ListAlarms(context.Background(), tc.pm)
alarms, err := repo.ListAllAlarms(context.Background(), tc.pm)
if tc.err != nil {
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
@@ -415,6 +415,184 @@ func TestListAlarms(t *testing.T) {
}
}
func TestListUserAlarms(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM alarms")
require.Nil(t, err, fmt.Sprintf("clean alarms unexpected error: %s", err))
_, err = db.Exec("DELETE FROM rules")
require.Nil(t, err, fmt.Sprintf("clean rules unexpected error: %s", err))
})
repo := postgres.NewAlarmsRepo(db)
domainID := generateUUID(t)
userID := generateUUID(t)
otherUserID := generateUUID(t)
adminUserID := generateUUID(t)
// Create 10 rules and 10 alarms referencing them.
// Assign userID to the first 6 rules via role membership.
var ruleIDs []string
var createdAlarms []alarms.Alarm
for i := range 10 {
ruleID := generateUUID(t)
_, err := db.Exec(`INSERT INTO rules (id, name, domain_id, status, logic_type, logic_value) VALUES ($1, $2, $3, 0, 0, '')`,
ruleID, fmt.Sprintf("rule-%d", i), domainID)
require.Nil(t, err, fmt.Sprintf("insert rule unexpected error: %s", err))
ruleIDs = append(ruleIDs, ruleID)
alarm := alarms.Alarm{
ID: generateUUID(t),
RuleID: ruleID,
DomainID: domainID,
ChannelID: generateUUID(t),
ClientID: generateUUID(t),
Measurement: namegen.Generate(),
Value: namegen.Generate(),
Unit: namegen.Generate(),
Threshold: namegen.Generate(),
Cause: namegen.Generate(),
Status: 0,
AssigneeID: generateUUID(t),
CreatedAt: time.Now().UTC().Add(time.Duration(i) * time.Minute),
}
alarm, err = repo.CreateAlarm(context.Background(), alarm)
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
createdAlarms = append(createdAlarms, alarm)
}
// Assign userID to the first 6 rules via rules_roles + rules_role_members.
userRoleIDs := make([]string, 6)
for i := range 6 {
roleID := generateUUID(t)
userRoleIDs[i] = roleID
_, err := db.Exec(`INSERT INTO rules_roles (id, name, entity_id) VALUES ($1, $2, $3)`, roleID, "admin", ruleIDs[i])
require.Nil(t, err, fmt.Sprintf("insert rules_roles unexpected error: %s", err))
_, err = db.Exec(`INSERT INTO rules_role_members (role_id, member_id, entity_id) VALUES ($1, $2, $3)`, roleID, userID, ruleIDs[i])
require.Nil(t, err, fmt.Sprintf("insert rules_role_members unexpected error: %s", err))
}
for i := range 10 {
var roleID string
if i < 6 {
roleID = userRoleIDs[i]
} else {
roleID = generateUUID(t)
_, err := db.Exec(`INSERT INTO rules_roles (id, name, entity_id) VALUES ($1, $2, $3)`, roleID, "admin", ruleIDs[i])
require.Nil(t, err, fmt.Sprintf("insert rules_roles unexpected error: %s", err))
}
_, err := db.Exec(`INSERT INTO rules_role_members (role_id, member_id, entity_id) VALUES ($1, $2, $3)`, roleID, adminUserID, ruleIDs[i])
require.Nil(t, err, fmt.Sprintf("insert rules_role_members unexpected error: %s", err))
}
_ = createdAlarms
cases := []struct {
desc string
userID string
pm alarms.PageMetadata
count int
err error
}{
{
desc: "list user alarms returns only accessible alarms",
userID: userID,
pm: alarms.PageMetadata{
Offset: 0,
Limit: 100,
},
count: 6,
err: nil,
},
{
desc: "list user alarms with limit",
userID: userID,
pm: alarms.PageMetadata{
Offset: 0,
Limit: 3,
},
count: 3,
err: nil,
},
{
desc: "list user alarms with offset",
userID: userID,
pm: alarms.PageMetadata{
Offset: 4,
Limit: 100,
},
count: 2,
err: nil,
},
{
desc: "list user alarms with domain filter",
userID: userID,
pm: alarms.PageMetadata{
DomainID: domainID,
Offset: 0,
Limit: 100,
},
count: 6,
err: nil,
},
{
desc: "list user alarms with non-existing domain returns 0",
userID: userID,
pm: alarms.PageMetadata{
DomainID: generateUUID(t),
Offset: 0,
Limit: 100,
},
count: 0,
err: nil,
},
{
desc: "list alarms for user with no role assignments returns 0",
userID: otherUserID,
pm: alarms.PageMetadata{
Offset: 0,
Limit: 100,
},
count: 0,
err: nil,
},
{
desc: "list alarms for admin user with role on all rules returns all alarms",
userID: adminUserID,
pm: alarms.PageMetadata{
Offset: 0,
Limit: 100,
},
count: 10,
err: nil,
},
{
desc: "list user alarms ordered by created_at ascending",
userID: userID,
pm: alarms.PageMetadata{
Offset: 0,
Limit: 100,
Order: "created_at",
Dir: "asc",
},
count: 6,
err: nil,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
page, err := repo.ListUserAlarms(context.Background(), tc.userID, tc.pm)
if tc.err != nil {
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
return
}
require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err))
assert.Equal(t, tc.count, len(page.Alarms), fmt.Sprintf("%s: expected %d alarms, got %d", tc.desc, tc.count, len(page.Alarms)))
})
}
}
func TestDeleteAlarm(t *testing.T) {
t.Cleanup(func() {
_, err := db.Exec("DELETE FROM alarms")
+15 -3
View File
@@ -4,13 +4,16 @@
package postgres
import (
rpostgres "github.com/absmach/magistrala/re/postgres"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
_ "github.com/jackc/pgx/v5/stdlib" // required for SQL access
migrate "github.com/rubenv/sql-migrate"
)
// Migration of Users service.
func Migration() *migrate.MemoryMigrationSource {
return &migrate.MemoryMigrationSource{
// Migration of Alarms service.
func Migration() (*migrate.MemoryMigrationSource, error) {
alarmsMigration := &migrate.MemoryMigrationSource{
Migrations: []*migrate.Migration{
{
Id: "alarms_01",
@@ -50,4 +53,13 @@ func Migration() *migrate.MemoryMigrationSource {
},
},
}
rulesMigration, err := rpostgres.Migration()
if err != nil {
return &migrate.MemoryMigrationSource{}, errors.Wrap(repoerr.ErrRoleMigration, err)
}
alarmsMigration.Migrations = append(alarmsMigration.Migrations, rulesMigration.Migrations...)
return alarmsMigration, nil
}
+5 -1
View File
@@ -75,7 +75,11 @@ func TestMain(m *testing.M) {
SSLRootCert: "",
}
if db, err = postgres.Setup(dbConfig, *apostgres.Migration()); err != nil {
migration, err := apostgres.Migration()
if err != nil {
log.Fatalf("Could not get migration: %s", err)
}
if db, err = postgres.Setup(dbConfig, *migration); err != nil {
log.Fatalf("Could not setup test DB connection: %s", err)
}
+5 -1
View File
@@ -43,6 +43,7 @@ func (s *service) CreateAlarm(ctx context.Context, alarm Alarm) error {
if _, err = s.repo.CreateAlarm(ctx, alarm); err != nil && err != repoerr.ErrNotFound {
return err
}
return nil
}
@@ -51,7 +52,10 @@ func (s *service) ViewAlarm(ctx context.Context, session authn.Session, alarmID
}
func (s *service) ListAlarms(ctx context.Context, session authn.Session, pm PageMetadata) (AlarmsPage, error) {
return s.repo.ListAlarms(ctx, pm)
if session.SuperAdmin {
return s.repo.ListAllAlarms(ctx, pm)
}
return s.repo.ListUserAlarms(ctx, session.UserID, pm)
}
func (s *service) DeleteAlarm(ctx context.Context, session authn.Session, alarmID string) error {
+11 -25
View File
@@ -6,7 +6,6 @@ package alarms_test
import (
"context"
"fmt"
"math"
"testing"
"time"
@@ -22,9 +21,13 @@ import (
var idp = uuid.New()
func newService(t *testing.T, repo *mocks.Repository) alarms.Service {
return alarms.NewService(idp, repo)
}
func TestCreateAlarm(t *testing.T) {
repo := new(mocks.Repository)
svc := alarms.NewService(idp, repo)
svc := newService(t, repo)
ts := time.Now()
cases := []struct {
desc string
@@ -69,33 +72,16 @@ func TestCreateAlarm(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
repoCall := repo.On("CreateAlarm", context.Background(), mock.Anything).Return(tc.alarm, tc.err)
repoCall1 := repo.On("ListAlarms", context.Background(), alarms.PageMetadata{
Offset: 0, Limit: 1,
DomainID: tc.alarm.DomainID,
ChannelID: tc.alarm.ChannelID,
ClientID: tc.alarm.ClientID,
Subtopic: tc.alarm.Subtopic,
Measurement: tc.alarm.Measurement,
RuleID: tc.alarm.RuleID,
Status: alarms.AllStatus,
Severity: math.MaxUint8,
CreatedTo: tc.alarm.CreatedAt,
}).Return(alarms.AlarmsPage{}, tc.err)
err := svc.CreateAlarm(context.Background(), tc.alarm)
if tc.err != nil {
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
return
}
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
repoCall.Unset()
repoCall1.Unset()
})
}
}
func TestViewAlarm(t *testing.T) {
repo := new(mocks.Repository)
svc := alarms.NewService(idp, repo)
svc := newService(t, repo)
cases := []struct {
desc string
@@ -134,7 +120,7 @@ func TestViewAlarm(t *testing.T) {
func TestUpdateAlarm(t *testing.T) {
repo := new(mocks.Repository)
svc := alarms.NewService(idp, repo)
svc := newService(t, repo)
cases := []struct {
desc string
@@ -192,7 +178,7 @@ func TestUpdateAlarm(t *testing.T) {
func TestListAlarms(t *testing.T) {
repo := new(mocks.Repository)
svc := alarms.NewService(idp, repo)
svc := newService(t, repo)
cases := []struct {
desc string
@@ -219,7 +205,7 @@ func TestListAlarms(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
s := authn.Session{DomainID: tc.pm.DomainID}
repoCall := repo.On("ListAlarms", context.Background(), tc.pm).Return(tc.page, tc.err)
repoCall := repo.On("ListUserAlarms", context.Background(), s.UserID, tc.pm).Return(tc.page, tc.err)
_, err := svc.ListAlarms(context.Background(), s, tc.pm)
if tc.err != nil {
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
@@ -233,7 +219,7 @@ func TestListAlarms(t *testing.T) {
func TestDeleteAlarm(t *testing.T) {
repo := new(mocks.Repository)
svc := alarms.NewService(idp, repo)
svc := newService(t, repo)
cases := []struct {
desc string
+76 -7
View File
@@ -15,17 +15,23 @@ import (
"github.com/absmach/magistrala/alarms/brokers"
"github.com/absmach/magistrala/alarms/consumer"
"github.com/absmach/magistrala/alarms/middleware"
"github.com/absmach/magistrala/alarms/operations"
alarmsRepo "github.com/absmach/magistrala/alarms/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
rconsumer "github.com/absmach/magistrala/pkg/re/events/consumer"
rpostgres "github.com/absmach/magistrala/re/postgres"
dpostgres "github.com/absmach/supermq/domains/postgres"
smqlog "github.com/absmach/supermq/logger"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/authn/authsvc"
authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc"
dconsumer "github.com/absmach/supermq/pkg/domains/events/consumer"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
"github.com/absmach/supermq/pkg/grpcclient"
"github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
"github.com/absmach/supermq/pkg/permissions"
"github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
@@ -42,14 +48,18 @@ const (
defDB = "alarms"
defSvcHTTPPort = "8050"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
alarmEntity = "alarm"
)
type config struct {
LogLevel string `env:"MG_ALARMS_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
InstanceID string `env:"MG_ALARMS_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
LogLevel string `env:"MG_ALARMS_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
InstanceID string `env:"MG_ALARMS_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESConsumerName string `env:"MG_ALARMS_EVENT_CONSUMER" envDefault:"alarms"`
PermissionsFile string `env:"SMQ_PERMISSIONS_FILE" envDefault:"permission.yaml"`
}
func main() {
@@ -87,7 +97,14 @@ func main() {
logger.Error(err.Error())
}
db, err := postgres.Setup(dbConfig, *alarmsRepo.Migration())
migrations, err := alarmsRepo.Migration()
if err != nil {
logger.Error(fmt.Sprintf("failed to load migrations: %s", err))
exitCode = 1
return
}
db, err := postgres.Setup(dbConfig, *migrations)
if err != nil {
logger.Error(err.Error())
exitCode = 1
@@ -138,11 +155,63 @@ func main() {
logger.Info("AuthZ successfully connected to auth gRPC server " + authzHandler.Secure())
ddatabase := postgres.NewDatabase(db, dbConfig, tracer)
drepo := dpostgres.NewRepository(ddatabase)
if err := dconsumer.DomainsEventsSubscribe(ctx, drepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil {
logger.Error(fmt.Sprintf("failed to create domains event store : %s", err))
exitCode = 1
return
}
rdatabase := postgres.NewDatabase(db, dbConfig, tracer)
rrepo := rpostgres.NewRepository(rdatabase)
if err := rconsumer.RulesEventsSubscribe(ctx, rrepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil {
logger.Error(fmt.Sprintf("failed to subscribe to rules events: %s", err))
exitCode = 1
return
}
idp := uuid.New()
svc := alarms.NewService(idp, repo)
svc = middleware.NewAuthorizationMiddleware(svc, authz)
permConfig, err := permissions.ParsePermissionsFile(cfg.PermissionsFile)
if err != nil {
logger.Error(fmt.Sprintf("failed to parse permissions file: %s", err))
exitCode = 1
return
}
alarmOps, _, err := permConfig.GetEntityPermissions(alarmEntity)
if err != nil {
logger.Error(fmt.Sprintf("failed to get alarm permissions: %s", err))
exitCode = 1
return
}
entitiesOps, err := permissions.NewEntitiesOperations(
permissions.EntitiesPermission{
operations.EntityType: alarmOps,
},
permissions.EntitiesOperationDetails[permissions.Operation]{
operations.EntityType: operations.OperationDetails(),
},
)
if err != nil {
logger.Error(fmt.Sprintf("failed to create entity operations: %s", err))
exitCode = 1
return
}
svc, err = middleware.NewAuthorizationMiddleware(svc, authz, entitiesOps)
if err != nil {
logger.Error(fmt.Sprintf("failed to create authorization middleware: %s", err))
exitCode = 1
return
}
svc = middleware.NewLoggingMiddleware(logger, svc)
counter, latency := prometheus.MakeMetrics("alarms", "api")
svc = middleware.NewMetricsMiddleware(counter, latency, svc)
+5 -1
View File
@@ -52,6 +52,8 @@ SMQ_AUTH_GRPC_TIMEOUT=300s
SMQ_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/auth-grpc-client.crt}
SMQ_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/auth-grpc-client.key}
SMQ_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}
SMQ_AUTH_ACCESS_TOKEN_DURATION=1h
SMQ_AUTH_REFRESH_TOKEN_DURATION=24h
#### Clients Client Config
SMQ_CLIENTS_URL=http://clients:9006
@@ -148,6 +150,7 @@ MG_ALARMS_DB_SSL_CERT=
MG_ALARMS_DB_SSL_KEY=
MG_ALARMS_DB_SSL_ROOT_CERT=
MG_ALARMS_INSTANCE_ID=
MG_ALARMS_EVENT_CONSUMER=alarms
### REPORTS
MG_REPORTS_LOG_LEVEL=debug
@@ -416,6 +419,7 @@ MG_BACKEND_OBJECT_STORAGE_ACCESS_KEY=localKey
MG_BACKEND_OBJECT_STORAGE_SECRET_KEY=localSecret
MG_BACKEND_OBJECT_STORAGE_WRITE_TTL=1m
MG_BACKEND_OBJECT_STORAGE_READ_TTL=15m
MG_BACKEND_OBJECT_STORAGE_TTL=15m
#### Auth GRPC Client Config
MG_AUTH_GRPC_URL=auth:7001
@@ -493,4 +497,4 @@ MG_RELEASE_TAG=latest
SMQ_ALLOW_UNVERIFIED_USER=true
# Set to yes to accept the EULA for the UI services. To view the EULA visit: https://github.com/absmach/eula
MG_UI_DOCKER_ACCEPT_EULA=yes
MG_UI_DOCKER_ACCEPT_EULA=no
+2
View File
@@ -397,6 +397,7 @@ services:
MG_ALARMS_DB_SSL_KEY: ${MG_ALARMS_DB_SSL_KEY}
MG_ALARMS_DB_SSL_ROOT_CERT: ${MG_ALARMS_DB_SSL_ROOT_CERT}
SMQ_MESSAGE_BROKER_URL: ${SMQ_MESSAGE_BROKER_URL}
SMQ_ES_URL: ${SMQ_ES_URL}
SMQ_JAEGER_URL: ${SMQ_JAEGER_URL}
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL}
@@ -415,6 +416,7 @@ services:
SMQ_SPICEDB_SCHEMA_FILE: ${SMQ_SPICEDB_SCHEMA_FILE}
SMQ_PERMISSIONS_FILE: ${SMQ_PERMISSIONS_FILE}
MG_ALARMS_INSTANCE_ID: ${MG_ALARMS_INSTANCE_ID}
MG_ALARMS_EVENT_CONSUMER: ${MG_ALARMS_EVENT_CONSUMER}
SMQ_ALLOW_UNVERIFIED_USER: ${SMQ_ALLOW_UNVERIFIED_USER}
ports:
- ${MG_ALARMS_HTTP_PORT}:${MG_ALARMS_HTTP_PORT}
+6 -6
View File
@@ -3,13 +3,13 @@
alarm:
operations:
- add: alarm_create_permission
- list: alarm_read_permission
- view: read_permission
- update: update_permission
- enable: update_permission
- disable: update_permission
- delete: delete_permission
- view: alarm_read_permission
- update: alarm_update_permission
- delete: alarm_delete_permission
- assign: alarm_assign_permission
- acknowledge: alarm_acknowledge_permission
- resolve: alarm_resolve_permission
rule:
operations:
+15 -28
View File
@@ -308,7 +308,6 @@ definition domain {
relation group_view_role_users: role#member | team#member
// Magistrala-specific relations
relation alarm_create: role#member | team#member
relation alarm_update: role#member | team#member
relation alarm_read: role#member | team#member
relation alarm_delete: role#member | team#member
@@ -320,9 +319,9 @@ definition domain {
relation rule_add_role_users: role#member | team#member
relation rule_remove_role_users: role#member | team#member
relation rule_view_role_users: role#member | team#member
relation rule_alarm_assign: role#member | team#member
relation rule_alarm_acknowledge: role#member | team#member
relation rule_alarm_resolve: role#member | team#member
relation alarm_assign: role#member | team#member
relation alarm_acknowledge: role#member | team#member
relation alarm_resolve: role#member | team#member
relation report_create: role#member | team#member
relation report_update: role#member | team#member
relation report_read: role#member | team#member
@@ -352,7 +351,7 @@ definition domain {
channel_manage_role + channel_add_role_users + channel_remove_role_users + channel_view_role_users +
group_update + group_membership + group_read + group_delete + group_set_child + group_set_parent +
group_manage_role + group_add_role_users + group_remove_role_users + group_view_role_users +
alarm_create + alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + rule_alarm_assign + rule_alarm_acknowledge + rule_alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users +
alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + alarm_assign + alarm_acknowledge + alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users +
organization->admin
permission admin = (read & update & enable & disable & delete & manage_role & add_role_users & remove_role_users & view_role_users) + organization->admin
@@ -398,7 +397,6 @@ definition domain {
permission group_view_role_users_permission = group_view_role_users + team->group_view_role_users + organization->admin
// Magistrala-specific permissions
permission alarm_create_permission = alarm_create + team->alarm_create + organization->admin
permission alarm_update_permission = alarm_update + team->alarm_update + organization->admin
permission alarm_read_permission = alarm_read + team->alarm_read + organization->admin
permission alarm_delete_permission = alarm_delete + team->alarm_delete + organization->admin
@@ -410,9 +408,9 @@ definition domain {
permission rule_add_role_users_permission = rule_add_role_users + team->rule_add_role_users + organization->admin
permission rule_remove_role_users_permission = rule_remove_role_users + team->rule_remove_role_users + organization->admin
permission rule_view_role_users_permission = rule_view_role_users + team->rule_view_role_users + organization->admin
permission rule_alarm_assign_permission = rule_alarm_assign + team->rule_alarm_assign + organization->admin
permission rule_alarm_acknowledge_permission = rule_alarm_acknowledge + team->rule_alarm_acknowledge + organization->admin
permission rule_alarm_resolve_permission = rule_alarm_resolve + team->rule_alarm_resolve + organization->admin
permission alarm_assign_permission = alarm_assign + team->alarm_assign + organization->admin
permission alarm_acknowledge_permission = alarm_acknowledge + team->alarm_acknowledge + organization->admin
permission alarm_resolve_permission = alarm_resolve + team->alarm_resolve + organization->admin
permission report_create_permission = report_create + team->report_create + organization->admin
permission report_update_permission = report_update + team->report_update + organization->admin
permission report_read_permission = report_read + team->report_read + organization->admin
@@ -512,7 +510,6 @@ definition team {
relation group_view_role_users: role#member | team#member
// Magistrala-specific relations
relation alarm_create: role#member | team#member
relation alarm_update: role#member | team#member
relation alarm_read: role#member | team#member
relation alarm_delete: role#member | team#member
@@ -524,9 +521,9 @@ definition team {
relation rule_add_role_users: role#member | team#member
relation rule_remove_role_users: role#member | team#member
relation rule_view_role_users: role#member | team#member
relation rule_alarm_assign: role#member | team#member
relation rule_alarm_acknowledge: role#member | team#member
relation rule_alarm_resolve: role#member | team#member
relation alarm_assign: role#member | team#member
relation alarm_acknowledge: role#member | team#member
relation alarm_resolve: role#member | team#member
relation report_create: role#member | team#member
relation report_update: role#member | team#member
relation report_read: role#member | team#member
@@ -636,18 +633,6 @@ definition platform {
// Overlay team block consumed by scripts/combine-schema.sh during merge.
definition alarm {
relation domain: domain
relation update: role#member
relation read: role#member
relation delete: role#member
permission update_permission = update + domain->alarm_update_permission
permission read_permission = read + domain->alarm_read_permission
permission delete_permission = delete + domain->alarm_delete_permission
}
definition rule {
relation domain: domain
@@ -660,6 +645,7 @@ relation add_role_users: role#member
relation remove_role_users: role#member
relation view_role_users: role#member
relation alarm_read: role#member
relation alarm_assign: role#member
relation alarm_acknowledge: role#member
relation alarm_resolve: role#member
@@ -673,9 +659,10 @@ permission add_role_users_permission = add_role_users + domain->rule_add_role_us
permission remove_role_users_permission = remove_role_users + domain->rule_remove_role_users_permission
permission view_role_users_permission = view_role_users + domain->rule_view_role_users_permission
permission alarm_assign_permission = alarm_assign + domain->rule_alarm_assign_permission
permission alarm_acknowledge_permission = alarm_acknowledge + domain->rule_alarm_acknowledge_permission
permission alarm_resolve_permission = alarm_resolve + domain->rule_alarm_resolve_permission
permission alarm_read_permission = alarm_read + domain->alarm_read_permission
permission alarm_assign_permission = alarm_assign + domain->alarm_assign_permission
permission alarm_acknowledge_permission = alarm_acknowledge + domain->alarm_acknowledge_permission
permission alarm_resolve_permission = alarm_resolve + domain->alarm_resolve_permission
}
definition report {
+15 -29
View File
@@ -28,7 +28,6 @@
definition domain {
// Magistrala-specific relations
relation alarm_create: role#member | team#member
relation alarm_update: role#member | team#member
relation alarm_read: role#member | team#member
relation alarm_delete: role#member | team#member
@@ -41,9 +40,9 @@ definition domain {
relation rule_add_role_users: role#member | team#member
relation rule_remove_role_users: role#member | team#member
relation rule_view_role_users: role#member | team#member
relation rule_alarm_assign: role#member | team#member
relation rule_alarm_acknowledge: role#member | team#member
relation rule_alarm_resolve: role#member | team#member
relation alarm_assign: role#member | team#member
relation alarm_acknowledge: role#member | team#member
relation alarm_resolve: role#member | team#member
relation report_create: role#member | team#member
relation report_update: role#member | team#member
@@ -55,7 +54,6 @@ definition domain {
relation report_view_role_users: role#member | team#member
// Magistrala-specific permissions
permission alarm_create_permission = alarm_create + team->alarm_create + organization->admin
permission alarm_update_permission = alarm_update + team->alarm_update + organization->admin
permission alarm_read_permission = alarm_read + team->alarm_read + organization->admin
permission alarm_delete_permission = alarm_delete + team->alarm_delete + organization->admin
@@ -68,9 +66,9 @@ definition domain {
permission rule_add_role_users_permission = rule_add_role_users + team->rule_add_role_users + organization->admin
permission rule_remove_role_users_permission = rule_remove_role_users + team->rule_remove_role_users + organization->admin
permission rule_view_role_users_permission = rule_view_role_users + team->rule_view_role_users + organization->admin
permission rule_alarm_assign_permission = rule_alarm_assign + team->rule_alarm_assign + organization->admin
permission rule_alarm_acknowledge_permission = rule_alarm_acknowledge + team->rule_alarm_acknowledge + organization->admin
permission rule_alarm_resolve_permission = rule_alarm_resolve + team->rule_alarm_resolve + organization->admin
permission alarm_assign_permission = alarm_assign + team->alarm_assign + organization->admin
permission alarm_acknowledge_permission = alarm_acknowledge + team->alarm_acknowledge + organization->admin
permission alarm_resolve_permission = alarm_resolve + team->alarm_resolve + organization->admin
permission report_create_permission = report_create + team->report_create + organization->admin
permission report_update_permission = report_update + team->report_update + organization->admin
@@ -82,14 +80,13 @@ definition domain {
permission report_view_role_users_permission = report_view_role_users + team->report_view_role_users + organization->admin
// Explicit extension injected into SuperMQ domain `permission membership`.
permission membership_extension = alarm_create + alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + rule_alarm_assign + rule_alarm_acknowledge + rule_alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users
permission membership_extension = alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + alarm_assign + alarm_acknowledge + alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users
}
// Overlay team block consumed by scripts/combine-schema.sh during merge.
definition team {
relation alarm_create: role#member | team#member
relation alarm_update: role#member | team#member
relation alarm_read: role#member | team#member
relation alarm_delete: role#member | team#member
@@ -102,9 +99,9 @@ definition team {
relation rule_add_role_users: role#member | team#member
relation rule_remove_role_users: role#member | team#member
relation rule_view_role_users: role#member | team#member
relation rule_alarm_assign: role#member | team#member
relation rule_alarm_acknowledge: role#member | team#member
relation rule_alarm_resolve: role#member | team#member
relation alarm_assign: role#member | team#member
relation alarm_acknowledge: role#member | team#member
relation alarm_resolve: role#member | team#member
relation report_create: role#member | team#member
relation report_update: role#member | team#member
@@ -114,19 +111,6 @@ definition team {
relation report_add_role_users: role#member | team#member
relation report_remove_role_users: role#member | team#member
relation report_view_role_users: role#member | team#member
}
definition alarm {
relation domain: domain
relation update: role#member
relation read: role#member
relation delete: role#member
permission update_permission = update + domain->alarm_update_permission
permission read_permission = read + domain->alarm_read_permission
permission delete_permission = delete + domain->alarm_delete_permission
}
definition rule {
@@ -141,6 +125,7 @@ relation add_role_users: role#member
relation remove_role_users: role#member
relation view_role_users: role#member
relation alarm_read: role#member
relation alarm_assign: role#member
relation alarm_acknowledge: role#member
relation alarm_resolve: role#member
@@ -154,9 +139,10 @@ permission add_role_users_permission = add_role_users + domain->rule_add_role_us
permission remove_role_users_permission = remove_role_users + domain->rule_remove_role_users_permission
permission view_role_users_permission = view_role_users + domain->rule_view_role_users_permission
permission alarm_assign_permission = alarm_assign + domain->rule_alarm_assign_permission
permission alarm_acknowledge_permission = alarm_acknowledge + domain->rule_alarm_acknowledge_permission
permission alarm_resolve_permission = alarm_resolve + domain->rule_alarm_resolve_permission
permission alarm_read_permission = alarm_read + domain->alarm_read_permission
permission alarm_assign_permission = alarm_assign + domain->alarm_assign_permission
permission alarm_acknowledge_permission = alarm_acknowledge + domain->alarm_acknowledge_permission
permission alarm_resolve_permission = alarm_resolve + domain->alarm_resolve_permission
}
definition report {
-4
View File
@@ -238,8 +238,6 @@ SMQ_USERS_ADMIN_USERNAME=admin
SMQ_USERS_ADMIN_FIRST_NAME=super
SMQ_USERS_ADMIN_LAST_NAME=admin
SMQ_USERS_PASS_REGEX=^.{8,}$
SMQ_USERS_ACCESS_TOKEN_DURATION=15m
SMQ_USERS_REFRESH_TOKEN_DURATION=24h
SMQ_USERS_HTTP_HOST=users
SMQ_USERS_HTTP_PORT=9002
SMQ_USERS_HTTP_SERVER_CERT=
@@ -263,8 +261,6 @@ SMQ_USERS_SECRET_KEY=HyE2D4RUt9nnKG6v8zKEqAp6g6ka8hhZsqUpzgKvnwpXrNVQSH
SMQ_USERS_ADMIN_EMAIL=admin@example.com
SMQ_USERS_ADMIN_PASSWORD=12345678
SMQ_USERS_PASS_REGEX=^.{8,}$
SMQ_USERS_ACCESS_TOKEN_DURATION=15m
SMQ_USERS_REFRESH_TOKEN_DURATION=24h
SMQ_USERS_ALLOW_SELF_REGISTER=true
SMQ_OAUTH_UI_REDIRECT_URL=http://localhost:9095${SMQ_UI_PATH_PREFIX}/tokens/secure
SMQ_OAUTH_UI_ERROR_URL=http://localhost:9095${SMQ_UI_PATH_PREFIX}/error
+1 -1
View File
@@ -1,7 +1,7 @@
# Copyright (c) Abstract Machines
# SPDX-License-Identifier: Apache-2.0
FROM golang:1.26.0-alpine3.22 AS builder
FROM golang:1.26.1-alpine3.22 AS builder
ARG SVC
ARG GOARCH
ARG GOARM
@@ -862,8 +862,6 @@ services:
SMQ_USERS_ADMIN_FIRST_NAME: ${SMQ_USERS_ADMIN_FIRST_NAME}
SMQ_USERS_ADMIN_LAST_NAME: ${SMQ_USERS_ADMIN_LAST_NAME}
SMQ_USERS_PASS_REGEX: ${SMQ_USERS_PASS_REGEX}
SMQ_USERS_ACCESS_TOKEN_DURATION: ${SMQ_USERS_ACCESS_TOKEN_DURATION}
SMQ_USERS_REFRESH_TOKEN_DURATION: ${SMQ_USERS_REFRESH_TOKEN_DURATION}
SMQ_USERS_HTTP_HOST: ${SMQ_USERS_HTTP_HOST}
SMQ_USERS_HTTP_PORT: ${SMQ_USERS_HTTP_PORT}
SMQ_USERS_HTTP_SERVER_CERT: ${SMQ_USERS_HTTP_SERVER_CERT}
+6 -3
View File
@@ -52,9 +52,6 @@ require (
github.com/authzed/cel-go v0.20.2 // indirect
github.com/authzed/spicedb v1.49.2 // indirect
github.com/ccoveille/go-safecast/v2 v2.0.0 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/go-errors/errors v1.5.1 // indirect
github.com/go-logr/zerologr v1.2.3 // indirect
@@ -67,6 +64,12 @@ require (
sigs.k8s.io/controller-runtime v0.22.4 // indirect
)
require (
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
)
require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20251209175733-2a1774d88802.1 // indirect
dario.cat/mergo v1.0.2 // indirect
+1
View File
@@ -6,4 +6,5 @@ package policies
const (
RulesType = "rules"
ReportsType = "reports"
AlarmsType = "alarms"
)
+204
View File
@@ -0,0 +1,204 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"encoding/json"
"time"
"github.com/absmach/magistrala/pkg/schedule"
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/roles"
rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer"
)
var (
errDecodeAddRuleEvent = errors.New("failed to decode rule add event")
errDecodeUpdateRuleEvent = errors.New("failed to decode rule update event")
errDecodeUpdateRuleTagsEvent = errors.New("failed to decode rule update tags event")
errDecodeUpdateRuleScheduleEvent = errors.New("failed to decode rule update schedule event")
errDecodeEnableRuleEvent = errors.New("failed to decode rule enable event")
errDecodeDisableRuleEvent = errors.New("failed to decode rule disable event")
errDecodeRemoveRuleEvent = errors.New("failed to decode rule remove event")
errID = errors.New("missing or invalid 'id'")
errName = errors.New("missing or invalid 'name'")
errTags = errors.New("invalid 'tags'")
errStatus = errors.New("missing or invalid 'status'")
errConvertStatus = errors.New("failed to convert status")
errCreatedBy = errors.New("missing or invalid 'created_by'")
errCreatedAt = errors.New("failed to parse 'created_at' time")
errUpdatedAt = errors.New("failed to parse 'updated_at' time")
errDecodeLogic = errors.New("failed to decode 'logic'")
errDecodeSchedule = errors.New("failed to decode 'schedule'")
)
// ToRule decodes a map[string]any event payload into a re.Rule.
func ToRule(data map[string]any) (re.Rule, error) {
var r re.Rule
id, ok := data["id"].(string)
if !ok {
return re.Rule{}, errID
}
r.ID = id
name, ok := data["name"].(string)
if !ok {
return re.Rule{}, errName
}
r.Name = name
stat, ok := data["status"].(string)
if !ok {
return re.Rule{}, errStatus
}
st, err := re.ToStatus(stat)
if err != nil {
return re.Rule{}, errors.Wrap(errConvertStatus, err)
}
r.Status = st
cby, ok := data["created_by"].(string)
if !ok {
return re.Rule{}, errCreatedBy
}
r.CreatedBy = cby
cat, ok := data["created_at"].(string)
if !ok {
return re.Rule{}, errCreatedAt
}
ct, err := time.Parse(re.TimeLayout, cat)
if err != nil {
return re.Rule{}, errors.Wrap(errCreatedAt, err)
}
r.CreatedAt = ct
if domain, ok := data["domain"].(string); ok {
r.DomainID = domain
}
if itags, ok := data["tags"].([]any); ok {
tags, err := rconsumer.ToStrings(itags)
if err != nil {
return re.Rule{}, errors.Wrap(errTags, err)
}
r.Tags = tags
}
if meta, ok := data["metadata"].(map[string]any); ok {
r.Metadata = meta
}
if uby, ok := data["updated_by"].(string); ok {
r.UpdatedBy = uby
}
if uat, ok := data["updated_at"].(string); ok {
ut, err := time.Parse(re.TimeLayout, uat)
if err != nil {
return re.Rule{}, errors.Wrap(errUpdatedAt, err)
}
r.UpdatedAt = ut
}
if ic, ok := data["input_channel"].(string); ok {
r.InputChannel = ic
}
if it, ok := data["input_topic"].(string); ok {
r.InputTopic = it
}
if rawLogic, ok := data["logic"].(map[string]any); ok {
b, err := json.Marshal(rawLogic)
if err != nil {
return re.Rule{}, errors.Wrap(errDecodeLogic, err)
}
if err := json.Unmarshal(b, &r.Logic); err != nil {
return re.Rule{}, errors.Wrap(errDecodeLogic, err)
}
}
if rawSched, ok := data["schedule"].(map[string]any); ok {
b, err := json.Marshal(rawSched)
if err != nil {
return re.Rule{}, errors.Wrap(errDecodeSchedule, err)
}
var sched schedule.Schedule
if err := json.Unmarshal(b, &sched); err != nil {
return re.Rule{}, errors.Wrap(errDecodeSchedule, err)
}
r.Schedule = sched
}
return r, nil
}
func decodeAddRuleEvent(data map[string]any) (re.Rule, []roles.RoleProvision, error) {
r, err := ToRule(data)
if err != nil {
return re.Rule{}, nil, errors.Wrap(errDecodeAddRuleEvent, err)
}
var rps []roles.RoleProvision
if irps, ok := data["roles_provisioned"].([]any); ok {
rps, err = rconsumer.ToRoleProvisions(irps)
if err != nil {
return re.Rule{}, nil, errors.Wrap(errDecodeAddRuleEvent, err)
}
}
return r, rps, nil
}
func decodeUpdateRuleEvent(data map[string]any) (re.Rule, error) {
r, err := ToRule(data)
if err != nil {
return re.Rule{}, errors.Wrap(errDecodeUpdateRuleEvent, err)
}
return r, nil
}
func decodeUpdateRuleTagsEvent(data map[string]any) (re.Rule, error) {
r, err := ToRule(data)
if err != nil {
return re.Rule{}, errors.Wrap(errDecodeUpdateRuleTagsEvent, err)
}
return r, nil
}
func decodeUpdateRuleScheduleEvent(data map[string]any) (re.Rule, error) {
r, err := ToRule(data)
if err != nil {
return re.Rule{}, errors.Wrap(errDecodeUpdateRuleScheduleEvent, err)
}
return r, nil
}
func decodeEnableRuleEvent(data map[string]any) (re.Rule, error) {
r, err := ToRule(data)
if err != nil {
return re.Rule{}, errors.Wrap(errDecodeEnableRuleEvent, err)
}
return r, nil
}
func decodeDisableRuleEvent(data map[string]any) (re.Rule, error) {
r, err := ToRule(data)
if err != nil {
return re.Rule{}, errors.Wrap(errDecodeDisableRuleEvent, err)
}
return r, nil
}
func decodeRemoveRuleEvent(data map[string]any) (string, error) {
id, ok := data["id"].(string)
if !ok {
return "", errors.Wrap(errDecodeRemoveRuleEvent, errID)
}
return id, nil
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package consumer contains events consumer for events
// published by the Rules Engine service.
package consumer
+193
View File
@@ -0,0 +1,193 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"context"
"log/slog"
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/events"
"github.com/absmach/supermq/pkg/events/store"
rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer"
)
const (
stream = "events.supermq.rule.*"
create = "rule.create"
update = "rule.update"
updateTags = "rule.update_tags"
updateSchedule = "rule.update_schedule"
enable = "rule.enable"
disable = "rule.disable"
remove = "rule.remove"
)
var (
errNoOperationKey = errors.New("operation key is not found in event message")
errAddRuleEvent = errors.New("failed to consume rule create event")
errUpdateRuleEvent = errors.New("failed to consume rule update event")
errUpdateRuleTagsEvent = errors.New("failed to consume rule update tags event")
errUpdateRuleScheduleEvent = errors.New("failed to consume rule update schedule event")
errEnableRuleEvent = errors.New("failed to consume rule enable event")
errDisableRuleEvent = errors.New("failed to consume rule disable event")
errRemoveRuleEvent = errors.New("failed to consume rule remove event")
)
type eventHandler struct {
repo re.Repository
rolesEventHandler rconsumer.EventHandler
}
func RulesEventsSubscribe(ctx context.Context, repo re.Repository, esURL, esConsumerName string, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, esURL, logger)
if err != nil {
return err
}
subConfig := events.SubscriberConfig{
Stream: stream,
Consumer: esConsumerName,
Handler: NewEventHandler(repo),
Ordered: true,
}
return subscriber.Subscribe(ctx, subConfig)
}
// NewEventHandler returns new event store handler.
func NewEventHandler(repo re.Repository) events.EventHandler {
reh := rconsumer.NewEventHandler("rule", repo)
return &eventHandler{
repo: repo,
rolesEventHandler: reh,
}
}
func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
msg, err := event.Encode()
if err != nil {
return err
}
op, ok := msg["operation"]
if !ok {
return errNoOperationKey
}
switch op {
case create:
return es.addRuleHandler(ctx, msg)
case update:
return es.updateRuleHandler(ctx, msg)
case updateTags:
return es.updateRuleTagsHandler(ctx, msg)
case updateSchedule:
return es.updateRuleScheduleHandler(ctx, msg)
case enable:
return es.enableRuleHandler(ctx, msg)
case disable:
return es.disableRuleHandler(ctx, msg)
case remove:
return es.removeRuleHandler(ctx, msg)
}
return es.rolesEventHandler.Handle(ctx, op, msg)
}
func (es *eventHandler) addRuleHandler(ctx context.Context, data map[string]any) error {
r, rps, err := decodeAddRuleEvent(data)
if err != nil {
return errors.Wrap(errAddRuleEvent, err)
}
if _, err := es.repo.AddRule(ctx, r); err != nil {
return errors.Wrap(errAddRuleEvent, err)
}
if _, err := es.repo.AddRoles(ctx, rps); err != nil {
return errors.Wrap(errAddRuleEvent, err)
}
return nil
}
func (es *eventHandler) updateRuleHandler(ctx context.Context, data map[string]any) error {
r, err := decodeUpdateRuleEvent(data)
if err != nil {
return errors.Wrap(errUpdateRuleEvent, err)
}
if _, err := es.repo.UpdateRule(ctx, r); err != nil {
return errors.Wrap(errUpdateRuleEvent, err)
}
return nil
}
func (es *eventHandler) updateRuleTagsHandler(ctx context.Context, data map[string]any) error {
r, err := decodeUpdateRuleTagsEvent(data)
if err != nil {
return errors.Wrap(errUpdateRuleTagsEvent, err)
}
if _, err := es.repo.UpdateRuleTags(ctx, r); err != nil {
return errors.Wrap(errUpdateRuleTagsEvent, err)
}
return nil
}
func (es *eventHandler) updateRuleScheduleHandler(ctx context.Context, data map[string]any) error {
r, err := decodeUpdateRuleScheduleEvent(data)
if err != nil {
return errors.Wrap(errUpdateRuleScheduleEvent, err)
}
if _, err := es.repo.UpdateRuleSchedule(ctx, r); err != nil {
return errors.Wrap(errUpdateRuleScheduleEvent, err)
}
return nil
}
func (es *eventHandler) enableRuleHandler(ctx context.Context, data map[string]any) error {
r, err := decodeEnableRuleEvent(data)
if err != nil {
return errors.Wrap(errEnableRuleEvent, err)
}
if _, err := es.repo.UpdateRuleStatus(ctx, r); err != nil {
return errors.Wrap(errEnableRuleEvent, err)
}
return nil
}
func (es *eventHandler) disableRuleHandler(ctx context.Context, data map[string]any) error {
r, err := decodeDisableRuleEvent(data)
if err != nil {
return errors.Wrap(errDisableRuleEvent, err)
}
if _, err := es.repo.UpdateRuleStatus(ctx, r); err != nil {
return errors.Wrap(errDisableRuleEvent, err)
}
return nil
}
func (es *eventHandler) removeRuleHandler(ctx context.Context, data map[string]any) error {
id, err := decodeRemoveRuleEvent(data)
if err != nil {
return errors.Wrap(errRemoveRuleEvent, err)
}
if err := es.repo.RemoveRule(ctx, id); err != nil {
return errors.Wrap(errRemoveRuleEvent, err)
}
return nil
}
+1 -1
View File
@@ -25,7 +25,7 @@ func addRuleEndpoint(s re.Service) endpoint.Endpoint {
if err := req.validate(); err != nil {
return addRuleRes{}, err
}
rule, err := s.AddRule(ctx, session, req.Rule)
rule, _, err := s.AddRule(ctx, session, req.Rule)
if err != nil {
return addRuleRes{}, err
}
+2 -1
View File
@@ -26,6 +26,7 @@ import (
authnmocks "github.com/absmach/supermq/pkg/authn/mocks"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/roles"
"github.com/go-chi/chi/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@@ -236,7 +237,7 @@ func TestAddRuleEndpoint(t *testing.T) {
}
authCall := authn.On("Authenticate", mock.Anything, tc.token).Return(tc.authnRes, tc.authnErr)
svcCall := svc.On("AddRule", mock.Anything, tc.authnRes, tc.rule).Return(tc.svcRes, tc.svcErr)
svcCall := svc.On("AddRule", mock.Anything, tc.authnRes, tc.rule).Return(tc.svcRes, []roles.RoleProvision{}, tc.svcErr)
res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
+4 -1
View File
@@ -9,6 +9,7 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/events"
"github.com/absmach/supermq/pkg/roles"
)
const (
@@ -59,7 +60,8 @@ func (bre baseRuleEvent) Encode() map[string]any {
}
type createRuleEvent struct {
rule re.Rule
rule re.Rule
rolesProvisioned []roles.RoleProvision
baseRuleEvent
}
@@ -70,6 +72,7 @@ func (cre createRuleEvent) Encode() (map[string]any, error) {
}
maps.Copy(val, cre.baseRuleEvent.Encode())
val["operation"] = ruleCreate
val["roles_provisioned"] = cre.rolesProvisioned
return val, nil
}
+10 -8
View File
@@ -11,6 +11,7 @@ import (
"github.com/absmach/supermq/pkg/events"
"github.com/absmach/supermq/pkg/events/store"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/roles"
rmEvents "github.com/absmach/supermq/pkg/roles/rolemanager/events"
"github.com/go-chi/chi/v5/middleware"
)
@@ -44,7 +45,7 @@ func NewEventStoreMiddleware(ctx context.Context, svc re.Service, url string) (r
return nil, err
}
res := rmEvents.NewRoleManagerEventStore("alarms", supermqPrefix, svc, publisher)
res := rmEvents.NewRoleManagerEventStore("rules", rulePrefix, svc, publisher)
return &eventStore{
svc: svc,
@@ -53,19 +54,20 @@ func NewEventStoreMiddleware(ctx context.Context, svc re.Service, url string) (r
}, nil
}
func (es *eventStore) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
rule, err := es.svc.AddRule(ctx, session, r)
func (es *eventStore) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) {
rule, rps, err := es.svc.AddRule(ctx, session, r)
if err != nil {
return rule, err
return rule, rps, err
}
event := createRuleEvent{
rule: rule,
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
rule: rule,
rolesProvisioned: rps,
baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)),
}
if err := es.Publish(ctx, CreateStream, event); err != nil {
return rule, err
return rule, rps, err
}
return rule, nil
return rule, rps, nil
}
func (es *eventStore) ListRules(ctx context.Context, session authn.Session, pm re.PageMeta) (re.Page, error) {
+3 -2
View File
@@ -14,6 +14,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/permissions"
"github.com/absmach/supermq/pkg/policies"
"github.com/absmach/supermq/pkg/roles"
rolemgr "github.com/absmach/supermq/pkg/roles/rolemanager/middleware"
)
@@ -48,9 +49,9 @@ func AuthorizationMiddleware(svc re.Service, authz smqauthz.Authorization, entit
}, nil
}
func (am *authorizationMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
func (am *authorizationMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) {
if err := am.authorize(ctx, operations.OpAddRule, session, policies.DomainType, session.DomainID); err != nil {
return re.Rule{}, errors.Wrap(errDomainCreateRules, err)
return re.Rule{}, nil, errors.Wrap(errDomainCreateRules, err)
}
return am.svc.AddRule(ctx, session, r)
+3 -2
View File
@@ -15,6 +15,7 @@ import (
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/permissions"
"github.com/absmach/supermq/pkg/policies"
"github.com/absmach/supermq/pkg/roles"
rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware"
)
@@ -47,14 +48,14 @@ func NewCallout(svc re.Service, callout callout.Callout, entitiesOps permissions
}, nil
}
func (cm *calloutMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
func (cm *calloutMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) {
params := map[string]any{
"entities": r,
"count": 1,
}
if err := cm.callOut(ctx, session, operations.OpAddRule, params); err != nil {
return re.Rule{}, err
return re.Rule{}, nil, err
}
return cm.svc.AddRule(ctx, session, r)
+4 -2
View File
@@ -12,6 +12,7 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/roles"
rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware"
)
@@ -31,7 +32,7 @@ func LoggingMiddleware(svc re.Service, logger *slog.Logger) re.Service {
}
}
func (lm *loggingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, err error) {
func (lm *loggingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, rps []roles.RoleProvision, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
@@ -45,7 +46,8 @@ func (lm *loggingMiddleware) AddRule(ctx context.Context, session authn.Session,
}
lm.logger.Info("Add rule completed successfully", args...)
}(time.Now())
return lm.svc.AddRule(ctx, session, r)
res, rps, err = lm.svc.AddRule(ctx, session, r)
return
}
func (lm *loggingMiddleware) ViewRule(ctx context.Context, session authn.Session, id string, withRoles bool) (res re.Rule, err error) {
+2 -1
View File
@@ -10,6 +10,7 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/roles"
rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware"
"github.com/go-kit/kit/metrics"
)
@@ -32,7 +33,7 @@ func NewMetricsMiddleware(counter metrics.Counter, latency metrics.Histogram, se
}
}
func (mm *metricsMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
func (mm *metricsMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) {
defer func(begin time.Time) {
mm.counter.With("method", "add_rule").Add(1)
mm.latency.With("method", "add_rule").Observe(time.Since(begin).Seconds())
+2 -1
View File
@@ -9,6 +9,7 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/roles"
rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware"
smqTracing "github.com/absmach/supermq/pkg/tracing"
"go.opentelemetry.io/otel/attribute"
@@ -31,7 +32,7 @@ func NewTracingMiddleware(tracer trace.Tracer, svc re.Service) re.Service {
}
}
func (tm *tracingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
func (tm *tracingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) {
ctx, span := smqTracing.StartSpan(ctx, tm.tracer, "add_rule", trace.WithAttributes(
attribute.String("name", r.Name),
attribute.String("domain_id", r.DomainID),
+17 -9
View File
@@ -136,7 +136,7 @@ func (_c *Service_AddRole_Call) RunAndReturn(run func(ctx context.Context, sessi
}
// AddRule provides a mock function for the type Service
func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) {
ret := _mock.Called(ctx, session, r)
if len(ret) == 0 {
@@ -144,8 +144,9 @@ func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.R
}
var r0 re.Rule
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok {
var r1 []roles.RoleProvision
var r2 error
if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, []roles.RoleProvision, error)); ok {
return returnFunc(ctx, session, r)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok {
@@ -153,12 +154,19 @@ func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.R
} else {
r0 = ret.Get(0).(re.Rule)
}
if returnFunc, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok {
if returnFunc, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) []roles.RoleProvision); ok {
r1 = returnFunc(ctx, session, r)
} else {
r1 = ret.Error(1)
if ret.Get(1) != nil {
r1 = ret.Get(1).([]roles.RoleProvision)
}
}
return r0, r1
if returnFunc, ok := ret.Get(2).(func(context.Context, authn.Session, re.Rule) error); ok {
r2 = returnFunc(ctx, session, r)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// Service_AddRule_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddRule'
@@ -197,12 +205,12 @@ func (_c *Service_AddRule_Call) Run(run func(ctx context.Context, session authn.
return _c
}
func (_c *Service_AddRule_Call) Return(rule re.Rule, err error) *Service_AddRule_Call {
_c.Call.Return(rule, err)
func (_c *Service_AddRule_Call) Return(rule re.Rule, roleProvisions []roles.RoleProvision, err error) *Service_AddRule_Call {
_c.Call.Return(rule, roleProvisions, err)
return _c
}
func (_c *Service_AddRule_Call) RunAndReturn(run func(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error)) *Service_AddRule_Call {
func (_c *Service_AddRule_Call) RunAndReturn(run func(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error)) *Service_AddRule_Call {
_c.Call.Return(run)
return _c
}
+5 -3
View File
@@ -21,6 +21,8 @@ const (
GoType
)
const TimeLayout = "2006-01-02T15:04:05.999999Z"
type (
// ScriptType indicates Runtime type for the future versions
// that will support JS or Go runtimes alongside Lua.
@@ -66,7 +68,7 @@ func (r Rule) EventEncode() (map[string]any, error) {
m := map[string]any{
"id": r.ID,
"name": r.Name,
"created_at": r.CreatedAt.Format(time.RFC3339Nano),
"created_at": r.CreatedAt.Format(TimeLayout),
"created_by": r.CreatedBy,
"schedule": r.Schedule.EventEncode(),
"status": r.Status.String(),
@@ -81,7 +83,7 @@ func (r Rule) EventEncode() (map[string]any, error) {
}
if !r.UpdatedAt.IsZero() {
m["updated_at"] = r.UpdatedAt.Format(time.RFC3339Nano)
m["updated_at"] = r.UpdatedAt.Format(TimeLayout)
}
if r.UpdatedBy != "" {
@@ -225,7 +227,7 @@ type Page struct {
type Service interface {
messaging.MessageHandler
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, []roles.RoleProvision, error)
ViewRule(ctx context.Context, session authn.Session, id string, withRoles bool) (Rule, error)
UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
UpdateRuleTags(ctx context.Context, session authn.Session, r Rule) (Rule, error)
+8 -8
View File
@@ -58,17 +58,17 @@ func NewService(repo Repository, runInfo chan pkglog.RunInfo, policy policies.Se
}, nil
}
func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRule Rule, retErr error) {
func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRule Rule, retRps []roles.RoleProvision, retErr error) {
if r.Logic.Type == GoType && goKeywordRegex.MatchString(r.Logic.Value) {
return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed)
return Rule{}, nil, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed)
}
if r.Logic.Type == GoType && panicRegex.MatchString(r.Logic.Value) {
return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrPanicNotAllowed)
return Rule{}, nil, errors.Wrap(svcerr.ErrMalformedEntity, ErrPanicNotAllowed)
}
id, err := re.idp.ID()
if err != nil {
return Rule{}, err
return Rule{}, nil, err
}
now := time.Now().UTC()
r.CreatedAt = now
@@ -84,7 +84,7 @@ func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRu
rule, err := re.repo.AddRule(ctx, r)
if err != nil {
return Rule{}, errors.Wrap(svcerr.ErrCreateEntity, err)
return Rule{}, nil, errors.Wrap(svcerr.ErrCreateEntity, err)
}
defer func() {
@@ -109,12 +109,12 @@ func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRu
},
}
_, err = re.AddNewEntitiesRoles(ctx, session.DomainID, session.UserID, []string{rule.ID}, optionalPolicies, newBuiltInRoleMembers)
rps, err := re.AddNewEntitiesRoles(ctx, session.DomainID, session.UserID, []string{rule.ID}, optionalPolicies, newBuiltInRoleMembers)
if err != nil {
return Rule{}, errors.Wrap(svcerr.ErrAddPolicies, err)
return Rule{}, nil, errors.Wrap(svcerr.ErrAddPolicies, err)
}
return rule, nil
return rule, rps, nil
}
func (re *re) ViewRule(ctx context.Context, session authn.Session, id string, withRoles bool) (Rule, error) {
+1 -1
View File
@@ -494,7 +494,7 @@ func TestAddRule(t *testing.T) {
policyCall2 := policies.On("DeletePolicies", context.Background(), mock.Anything).Return(tc.deletePolicies).Maybe()
repoCall1 := repo.On("AddRoles", context.Background(), mock.Anything).Return([]roles.RoleProvision{}, tc.addRoleErr)
repoCall2 := repo.On("Remove", context.Background(), mock.Anything).Return(tc.deleteErr).Maybe()
res, err := svc.AddRule(context.Background(), tc.session, tc.rule)
res, _, err := svc.AddRule(context.Background(), tc.session, tc.rule)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
if err == nil {
assert.NotEmpty(t, res.ID, "expected non-empty result in ID")
+1 -1
View File
@@ -227,7 +227,7 @@ func authenticate(ctx context.Context, req listMessagesReq, authn smqauthn.Authe
if err != nil {
return "", "", err
}
if session.Role == smqauthn.AdminRole {
if session.Role == smqauthn.SuperAdminRole {
return session.UserID, policies.UserType, nil
}
+1 -1
View File
@@ -13,7 +13,7 @@ REPO_URL=https://github.com/absmach/supermq
TEMP_DIR="supermq"
DOCKER_DIR="docker"
DOCKER_DST_DIR="../docker"
DEST_DIR="../docker/supermq-docker"
DEST_DIR="../../docker/supermq-docker"
COMBINE_SCHEMA_SCRIPT="./combine-schema.sh"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"