From 2ef8437d8bbd0ef05e614375bf5f560248a6ebdb Mon Sep 17 00:00:00 2001 From: Steve Munene Date: Fri, 13 Mar 2026 16:29:32 +0300 Subject: [PATCH] MG-370 - Add fine grained access control to alarms (#404) * add access control to rules engine Signed-off-by: nyagamunene * add access control to reports Signed-off-by: nyagamunene * add access control to alarms Signed-off-by: nyagamunene * fix failing linter Signed-off-by: nyagamunene * remove unused variables Signed-off-by: nyagamunene * update authorization method Signed-off-by: nyagamunene * revert code Signed-off-by: nyagamunene * remove roles Signed-off-by: nyagamunene * update alarm permissions Signed-off-by: nyagamunene * update alarm permissions Signed-off-by: nyagamunene * address comments Signed-off-by: nyagamunene * fix tests Signed-off-by: nyagamunene * revert endpoint changes Signed-off-by: nyagamunene * fix make fetch Signed-off-by: nyagamunene * revert env variable Signed-off-by: nyagamunene * remove rule prefix Signed-off-by: nyagamunene * remove trailing line Signed-off-by: nyagamunene * remove unused constants Signed-off-by: nyagamunene * re consumer Signed-off-by: nyagamunene * update listing Signed-off-by: nyagamunene * fix tests Signed-off-by: nyagamunene * fix linter Signed-off-by: nyagamunene * fix rule roles interface Signed-off-by: nyagamunene * refactor listing commands Signed-off-by: nyagamunene * fetch supermq Signed-off-by: nyagamunene * address coments Signed-off-by: nyagamunene * update script Signed-off-by: nyagamunene * address comments Signed-off-by: nyagamunene * fetch supermq Signed-off-by: nyagamunene * fix time layout Signed-off-by: nyagamunene * fix failing linter Signed-off-by: nyagamunene * fix failing linter Signed-off-by: nyagamunene * fix role name Signed-off-by: nyagamunene * fix failing linter Signed-off-by: nyagamunene * address comments Signed-off-by: nyagamunene * remove white spaces Signed-off-by: nyagamunene * update check usperadmin method Signed-off-by: nyagamunene * update go mod file Signed-off-by: nyagamunene * fix tests Signed-off-by: nyagamunene * add missing env variable Signed-off-by: nyagamunene --------- Signed-off-by: nyagamunene --- alarms/alarms.go | 4 +- alarms/api/endpoint.go | 2 +- alarms/api/requests.go | 15 ++ alarms/api/transport.go | 6 +- alarms/middleware/authorization.go | 91 +++++++--- alarms/mocks/repository.go | 94 ++++++++-- alarms/operations.go | 69 -------- alarms/operations/operations.go | 47 +++++ alarms/postgres/alarms.go | 72 +++++--- alarms/postgres/alarms_test.go | 180 ++++++++++++++++++- alarms/postgres/init.go | 18 +- alarms/postgres/setup_test.go | 6 +- alarms/service.go | 6 +- alarms/service_test.go | 36 ++-- cmd/alarms/main.go | 83 ++++++++- docker/.env | 6 +- docker/docker-compose.yaml | 2 + docker/permission.yaml | 12 +- docker/spicedb/combined-schema.zed | 43 ++--- docker/spicedb/override-schema.zed | 44 ++--- docker/supermq-docker/.env | 4 - docker/supermq-docker/Dockerfile | 2 +- docker/supermq-docker/docker-compose.yaml | 2 - go.mod | 9 +- pkg/policies/evaluator.go | 1 + pkg/re/events/consumer/decode.go | 204 ++++++++++++++++++++++ pkg/re/events/consumer/doc.go | 6 + pkg/re/events/consumer/stream.go | 193 ++++++++++++++++++++ re/api/endpoints.go | 2 +- re/api/endpoints_test.go | 3 +- re/events/events.go | 5 +- re/events/streams.go | 18 +- re/middleware/authorization.go | 5 +- re/middleware/callout.go | 5 +- re/middleware/logging.go | 6 +- re/middleware/metrics.go | 3 +- re/middleware/tracing.go | 3 +- re/mocks/service.go | 26 ++- re/rule.go | 8 +- re/service.go | 16 +- re/service_test.go | 2 +- readers/api/http/transport.go | 2 +- scripts/supermq.sh | 2 +- 43 files changed, 1071 insertions(+), 292 deletions(-) delete mode 100644 alarms/operations.go create mode 100644 alarms/operations/operations.go create mode 100644 pkg/re/events/consumer/decode.go create mode 100644 pkg/re/events/consumer/doc.go create mode 100644 pkg/re/events/consumer/stream.go diff --git a/alarms/alarms.go b/alarms/alarms.go index 3f7b8fb82..0ff19fd62 100644 --- a/alarms/alarms.go +++ b/alarms/alarms.go @@ -72,6 +72,7 @@ type PageMetadata struct { AssignedBy string `json:"assigned_by" db:"assigned_by"` AcknowledgedBy string `json:"acknowledged_by" db:"acknowledged_by"` ResolvedBy string `json:"resolved_by" db:"resolved_by"` + UserID string `json:"user_id" db:"user_id"` } func (a Alarm) Validate() error { @@ -116,6 +117,7 @@ type Repository interface { CreateAlarm(ctx context.Context, alarm Alarm) (Alarm, error) UpdateAlarm(ctx context.Context, alarm Alarm) (Alarm, error) ViewAlarm(ctx context.Context, alarmID, domainID string) (Alarm, error) - ListAlarms(ctx context.Context, pm PageMetadata) (AlarmsPage, error) + ListAllAlarms(ctx context.Context, pm PageMetadata) (AlarmsPage, error) + ListUserAlarms(ctx context.Context, userID string, pm PageMetadata) (AlarmsPage, error) DeleteAlarm(ctx context.Context, id string) error } diff --git a/alarms/api/endpoint.go b/alarms/api/endpoint.go index 10b881aa9..a6e907440 100644 --- a/alarms/api/endpoint.go +++ b/alarms/api/endpoint.go @@ -16,7 +16,7 @@ import ( func updateAlarmEndpoint(svc alarms.Service) endpoint.Endpoint { return func(ctx context.Context, request any) (any, error) { - req := request.(alarmReq) + req := request.(updateAlarmReq) if err := req.validate(); err != nil { return alarmRes{}, errors.Wrap(apiutil.ErrValidation, err) } diff --git a/alarms/api/requests.go b/alarms/api/requests.go index 87dedaebb..070992a23 100644 --- a/alarms/api/requests.go +++ b/alarms/api/requests.go @@ -23,6 +23,21 @@ func (req alarmReq) validate() error { return nil } +type updateAlarmReq struct { + alarms.Alarm `json:",inline"` +} + +func (req updateAlarmReq) validate() error { + if req.Alarm.ID == "" { + return errors.New("missing alarm id") + } + if req.Alarm.AssigneeID == "" && req.Alarm.AcknowledgedBy == "" && req.Alarm.ResolvedBy == "" { + return errors.New("at least one of assignee_id, acknowledged_by, or resolved_by must be set") + } + + return nil +} + type listAlarmsReq struct { alarms.PageMetadata } diff --git a/alarms/api/transport.go b/alarms/api/transport.go index d94596282..13d207cb2 100644 --- a/alarms/api/transport.go +++ b/alarms/api/transport.go @@ -195,12 +195,12 @@ func decodeAlarmReq(_ context.Context, r *http.Request) (any, error) { func decodeUpdateAlarmReq(_ context.Context, r *http.Request) (any, error) { if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) { - return alarmReq{}, apiutil.ErrUnsupportedContentType + return updateAlarmReq{}, apiutil.ErrUnsupportedContentType } - req := alarmReq{} + req := updateAlarmReq{} if err := json.NewDecoder(r.Body).Decode(&req.Alarm); err != nil { - return alarmReq{}, errors.Wrap(apiutil.ErrMalformedRequestBody, err) + return updateAlarmReq{}, errors.Wrap(apiutil.ErrMalformedRequestBody, err) } req.Alarm.ID = chi.URLParam(r, "alarmID") diff --git a/alarms/middleware/authorization.go b/alarms/middleware/authorization.go index 49f2a77ab..ca0a8fb3e 100644 --- a/alarms/middleware/authorization.go +++ b/alarms/middleware/authorization.go @@ -7,10 +7,12 @@ import ( "context" "github.com/absmach/magistrala/alarms" + "github.com/absmach/magistrala/alarms/operations" "github.com/absmach/supermq/auth" "github.com/absmach/supermq/pkg/authn" smqauthz "github.com/absmach/supermq/pkg/authz" "github.com/absmach/supermq/pkg/errors" + svcerr "github.com/absmach/supermq/pkg/errors/service" "github.com/absmach/supermq/pkg/permissions" "github.com/absmach/supermq/pkg/policies" ) @@ -22,37 +24,40 @@ var ( ) type authorizationMiddleware struct { - svc alarms.Service - authz smqauthz.Authorization + svc alarms.Service + authz smqauthz.Authorization + entitiesOps permissions.EntitiesOperations[permissions.Operation] } var _ alarms.Service = (*authorizationMiddleware)(nil) -func NewAuthorizationMiddleware(svc alarms.Service, authz smqauthz.Authorization) alarms.Service { - return &authorizationMiddleware{ - svc: svc, - authz: authz, +func NewAuthorizationMiddleware(svc alarms.Service, authz smqauthz.Authorization, entitiesOps permissions.EntitiesOperations[permissions.Operation]) (alarms.Service, error) { + if err := entitiesOps.Validate(); err != nil { + return nil, err } + + return &authorizationMiddleware{ + svc: svc, + authz: authz, + entitiesOps: entitiesOps, + }, nil } -func (am *authorizationMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm) (err error) { +func (am *authorizationMiddleware) CreateAlarm(ctx context.Context, alarm alarms.Alarm) error { return am.svc.CreateAlarm(ctx, alarm) } -func (am *authorizationMiddleware) UpdateAlarm(ctx context.Context, session authn.Session, alarm alarms.Alarm) (dba alarms.Alarm, err error) { - // If assignee is present, check if assignee is member of domain - - if err := am.authorize(ctx, alarms.OpUpdateAlarm, session); err != nil { - return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err) - } - +func (am *authorizationMiddleware) UpdateAlarm(ctx context.Context, session authn.Session, alarm alarms.Alarm) (alarms.Alarm, error) { if alarm.AssigneeID != "" { - domainUserId := auth.EncodeDomainUserID(session.DomainID, alarm.AssigneeID) + if err := am.authorize(ctx, operations.OpAssignAlarm, session, policies.DomainType, session.DomainID); err != nil { + return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err) + } + domainUserID := auth.EncodeDomainUserID(session.DomainID, alarm.AssigneeID) if err := am.authz.Authorize(ctx, smqauthz.PolicyReq{ Domain: session.DomainID, SubjectType: policies.UserType, SubjectKind: policies.UsersKind, - Subject: domainUserId, + Subject: domainUserID, Permission: policies.MembershipPermission, ObjectType: policies.DomainType, Object: session.DomainID, @@ -61,11 +66,23 @@ func (am *authorizationMiddleware) UpdateAlarm(ctx context.Context, session auth } } + if alarm.AcknowledgedBy != "" { + if err := am.authorize(ctx, operations.OpAcknowledgeAlarm, session, policies.DomainType, session.DomainID); err != nil { + return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err) + } + } + + if alarm.ResolvedBy != "" { + if err := am.authorize(ctx, operations.OpResolveAlarm, session, policies.DomainType, session.DomainID); err != nil { + return alarms.Alarm{}, errors.Wrap(errDomainUpdateAlarms, err) + } + } + return am.svc.UpdateAlarm(ctx, session, alarm) } func (am *authorizationMiddleware) DeleteAlarm(ctx context.Context, session authn.Session, id string) error { - if err := am.authorize(ctx, alarms.OpDeleteAlarm, session); err != nil { + if err := am.authorize(ctx, operations.OpDeleteAlarm, session, policies.DomainType, session.DomainID); err != nil { return errors.Wrap(errDomainDeleteAlarms, err) } @@ -77,23 +94,27 @@ func (am *authorizationMiddleware) ListAlarms(ctx context.Context, session authn pm.DomainID = session.DomainID } - if err := am.authorize(ctx, alarms.OpListAlarms, session); err != nil { - return alarms.AlarmsPage{}, errors.Wrap(errDomainViewAlarms, err) + switch err := am.checkSuperAdmin(ctx, session); { + case err == nil: + session.SuperAdmin = true + case errors.Contains(err, svcerr.ErrSuperAdminAction): + default: + return alarms.AlarmsPage{}, err } return am.svc.ListAlarms(ctx, session, pm) } func (am *authorizationMiddleware) ViewAlarm(ctx context.Context, session authn.Session, id string) (alarms.Alarm, error) { - if err := am.authorize(ctx, alarms.OpViewAlarm, session); err != nil { + if err := am.authorize(ctx, operations.OpViewAlarm, session, policies.DomainType, session.DomainID); err != nil { return alarms.Alarm{}, errors.Wrap(errDomainViewAlarms, err) } return am.svc.ViewAlarm(ctx, session, id) } -func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions.Operation, session authn.Session) error { - perm, err := alarms.GetPermission(op) +func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions.Operation, session authn.Session, objType, obj string) error { + perm, err := am.entitiesOps.GetPermission(operations.EntityType, op) if err != nil { return err } @@ -103,19 +124,19 @@ func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions SubjectType: policies.UserType, SubjectKind: policies.UsersKind, Subject: session.DomainUserID, - Object: session.DomainID, - ObjectType: policies.DomainType, - Permission: perm, + Object: obj, + ObjectType: objType, + Permission: perm.String(), } var pat *smqauthz.PATReq if session.PatID != "" { - opName := alarms.OperationName(op) + opName := am.entitiesOps.OperationName(operations.EntityType, op) pat = &smqauthz.PATReq{ UserID: session.UserID, PatID: session.PatID, EntityID: session.DomainID, - EntityType: alarms.EntityType, + EntityType: operations.EntityType, Operation: opName, Domain: session.DomainID, } @@ -127,3 +148,19 @@ func (am *authorizationMiddleware) authorize(ctx context.Context, op permissions return nil } + +func (am *authorizationMiddleware) checkSuperAdmin(ctx context.Context, session authn.Session) error { + if session.Role != authn.SuperAdminRole { + return svcerr.ErrSuperAdminAction + } + if err := am.authz.Authorize(ctx, smqauthz.PolicyReq{ + SubjectType: policies.UserType, + Subject: session.UserID, + Permission: policies.AdminPermission, + ObjectType: policies.PlatformType, + Object: policies.SuperMQObject, + }, nil); err != nil { + return err + } + return nil +} diff --git a/alarms/mocks/repository.go b/alarms/mocks/repository.go index 581e72eac..c5fecb0fa 100644 --- a/alarms/mocks/repository.go +++ b/alarms/mocks/repository.go @@ -165,12 +165,12 @@ func (_c *Repository_DeleteAlarm_Call) RunAndReturn(run func(ctx context.Context return _c } -// ListAlarms provides a mock function for the type Repository -func (_mock *Repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) { +// ListAllAlarms provides a mock function for the type Repository +func (_mock *Repository) ListAllAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) { ret := _mock.Called(ctx, pm) if len(ret) == 0 { - panic("no return value specified for ListAlarms") + panic("no return value specified for ListAllAlarms") } var r0 alarms.AlarmsPage @@ -191,19 +191,19 @@ func (_mock *Repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) return r0, r1 } -// Repository_ListAlarms_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListAlarms' -type Repository_ListAlarms_Call struct { +// Repository_ListAllAlarms_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListAllAlarms' +type Repository_ListAllAlarms_Call struct { *mock.Call } -// ListAlarms is a helper method to define mock.On call +// ListAllAlarms is a helper method to define mock.On call // - ctx context.Context // - pm alarms.PageMetadata -func (_e *Repository_Expecter) ListAlarms(ctx interface{}, pm interface{}) *Repository_ListAlarms_Call { - return &Repository_ListAlarms_Call{Call: _e.mock.On("ListAlarms", ctx, pm)} +func (_e *Repository_Expecter) ListAllAlarms(ctx interface{}, pm interface{}) *Repository_ListAllAlarms_Call { + return &Repository_ListAllAlarms_Call{Call: _e.mock.On("ListAllAlarms", ctx, pm)} } -func (_c *Repository_ListAlarms_Call) Run(run func(ctx context.Context, pm alarms.PageMetadata)) *Repository_ListAlarms_Call { +func (_c *Repository_ListAllAlarms_Call) Run(run func(ctx context.Context, pm alarms.PageMetadata)) *Repository_ListAllAlarms_Call { _c.Call.Run(func(args mock.Arguments) { var arg0 context.Context if args[0] != nil { @@ -221,12 +221,84 @@ func (_c *Repository_ListAlarms_Call) Run(run func(ctx context.Context, pm alarm return _c } -func (_c *Repository_ListAlarms_Call) Return(alarmsPage alarms.AlarmsPage, err error) *Repository_ListAlarms_Call { +func (_c *Repository_ListAllAlarms_Call) Return(alarmsPage alarms.AlarmsPage, err error) *Repository_ListAllAlarms_Call { _c.Call.Return(alarmsPage, err) return _c } -func (_c *Repository_ListAlarms_Call) RunAndReturn(run func(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error)) *Repository_ListAlarms_Call { +func (_c *Repository_ListAllAlarms_Call) RunAndReturn(run func(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error)) *Repository_ListAllAlarms_Call { + _c.Call.Return(run) + return _c +} + +// ListUserAlarms provides a mock function for the type Repository +func (_mock *Repository) ListUserAlarms(ctx context.Context, userID string, pm alarms.PageMetadata) (alarms.AlarmsPage, error) { + ret := _mock.Called(ctx, userID, pm) + + if len(ret) == 0 { + panic("no return value specified for ListUserAlarms") + } + + var r0 alarms.AlarmsPage + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string, alarms.PageMetadata) (alarms.AlarmsPage, error)); ok { + return returnFunc(ctx, userID, pm) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, string, alarms.PageMetadata) alarms.AlarmsPage); ok { + r0 = returnFunc(ctx, userID, pm) + } else { + r0 = ret.Get(0).(alarms.AlarmsPage) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, string, alarms.PageMetadata) error); ok { + r1 = returnFunc(ctx, userID, pm) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// Repository_ListUserAlarms_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListUserAlarms' +type Repository_ListUserAlarms_Call struct { + *mock.Call +} + +// ListUserAlarms is a helper method to define mock.On call +// - ctx context.Context +// - userID string +// - pm alarms.PageMetadata +func (_e *Repository_Expecter) ListUserAlarms(ctx interface{}, userID interface{}, pm interface{}) *Repository_ListUserAlarms_Call { + return &Repository_ListUserAlarms_Call{Call: _e.mock.On("ListUserAlarms", ctx, userID, pm)} +} + +func (_c *Repository_ListUserAlarms_Call) Run(run func(ctx context.Context, userID string, pm alarms.PageMetadata)) *Repository_ListUserAlarms_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + var arg2 alarms.PageMetadata + if args[2] != nil { + arg2 = args[2].(alarms.PageMetadata) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *Repository_ListUserAlarms_Call) Return(alarmsPage alarms.AlarmsPage, err error) *Repository_ListUserAlarms_Call { + _c.Call.Return(alarmsPage, err) + return _c +} + +func (_c *Repository_ListUserAlarms_Call) RunAndReturn(run func(ctx context.Context, userID string, pm alarms.PageMetadata) (alarms.AlarmsPage, error)) *Repository_ListUserAlarms_Call { _c.Call.Return(run) return _c } diff --git a/alarms/operations.go b/alarms/operations.go deleted file mode 100644 index 94f97cba8..000000000 --- a/alarms/operations.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -package alarms - -import ( - "github.com/absmach/supermq/pkg/errors" - "github.com/absmach/supermq/pkg/permissions" - "github.com/absmach/supermq/pkg/policies" -) - -const EntityType = "alarms" - -const ( - OpAddAlarm = iota - OpViewAlarm - OpListAlarms - OpUpdateAlarm - OpDeleteAlarm - OpAssignAlarm - OpAcknowledgeAlarm - OpResolveAlarm -) - -const ( - OpAddAlarmStr = "OpAddAlarm" - OpViewAlarmStr = "OpViewAlarm" - OpListAlarmsStr = "OpListAlarms" - OpUpdateAlarmStr = "OpUpdateAlarm" - OpDeleteAlarmStr = "OpDeleteAlarm" - OpAssignAlarmStr = "OpAssignAlarm" - OpAcknowledgeAlarmStr = "OpAcknowledgeAlarm" - OpResolveAlarmStr = "OpResolveAlarm" -) - -func GetPermission(op permissions.Operation) (string, error) { - if op < OpAddAlarm || op > OpResolveAlarm { - return "", errors.New("invalid operation") - } - - if op == OpUpdateAlarm || op == OpDeleteAlarm { - return policies.AdminPermission, nil - } - - return policies.MembershipPermission, nil -} - -func OperationName(op permissions.Operation) string { - switch op { - case OpAddAlarm: - return OpAddAlarmStr - case OpViewAlarm: - return OpViewAlarmStr - case OpListAlarms: - return OpListAlarmsStr - case OpUpdateAlarm: - return OpUpdateAlarmStr - case OpDeleteAlarm: - return OpDeleteAlarmStr - case OpAssignAlarm: - return OpAssignAlarmStr - case OpAcknowledgeAlarm: - return OpAcknowledgeAlarmStr - case OpResolveAlarm: - return OpResolveAlarmStr - default: - return "unknown" - } -} diff --git a/alarms/operations/operations.go b/alarms/operations/operations.go new file mode 100644 index 000000000..e7277ae48 --- /dev/null +++ b/alarms/operations/operations.go @@ -0,0 +1,47 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package operations + +import "github.com/absmach/supermq/pkg/permissions" + +const EntityType = "alarm" + +// Alarm Operations. +const ( + OpViewAlarm permissions.Operation = iota + OpDeleteAlarm + OpListAlarms + OpAssignAlarm + OpAcknowledgeAlarm + OpResolveAlarm +) + +func OperationDetails() map[permissions.Operation]permissions.OperationDetails { + return map[permissions.Operation]permissions.OperationDetails{ + OpViewAlarm: { + Name: "view", + PermissionRequired: true, + }, + OpDeleteAlarm: { + Name: "delete", + PermissionRequired: true, + }, + OpListAlarms: { + Name: "list", + PermissionRequired: true, + }, + OpAssignAlarm: { + Name: "assign", + PermissionRequired: true, + }, + OpAcknowledgeAlarm: { + Name: "acknowledge", + PermissionRequired: true, + }, + OpResolveAlarm: { + Name: "resolve", + PermissionRequired: true, + }, + } +} diff --git a/alarms/postgres/alarms.go b/alarms/postgres/alarms.go index 2e49c161c..35da1583e 100644 --- a/alarms/postgres/alarms.go +++ b/alarms/postgres/alarms.go @@ -20,6 +20,10 @@ import ( "github.com/jmoiron/sqlx" ) +const alarmColumns = `alarms.id, alarms.rule_id, alarms.domain_id, alarms.channel_id, alarms.client_id, alarms.subtopic, alarms.measurement, alarms.value, alarms.unit, +alarms.threshold, alarms.cause, alarms.status, alarms.severity, alarms.assignee_id, alarms.created_at, alarms.updated_at, alarms.updated_by, alarms.assigned_at, +alarms.assigned_by, alarms.acknowledged_at, alarms.acknowledged_by, alarms.resolved_at, alarms.resolved_by, alarms.metadata` + type repository struct { db *sqlx.DB } @@ -183,32 +187,49 @@ func (r *repository) ViewAlarm(ctx context.Context, alarmID, domainID string) (a return alarm, nil } -func (r *repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) { +func (r *repository) ListAllAlarms(ctx context.Context, pm alarms.PageMetadata) (alarms.AlarmsPage, error) { query, err := pageQuery(pm) if err != nil { return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err) } + comQuery := fmt.Sprintf(`SELECT %s FROM alarms %s`, alarmColumns, query) + + return r.alarmsPage(ctx, comQuery, pm) +} + +func (r *repository) ListUserAlarms(ctx context.Context, userID string, pm alarms.PageMetadata) (alarms.AlarmsPage, error) { + query, err := pageQuery(pm) + if err != nil { + return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err) + } + + pm.UserID = userID + comQuery := fmt.Sprintf(`SELECT DISTINCT %s + FROM alarms + INNER JOIN rules_roles rr ON rr.entity_id = alarms.rule_id + INNER JOIN rules_role_members rrm ON rrm.role_id = rr.id AND rrm.member_id = :user_id + %s`, alarmColumns, query) + + return r.alarmsPage(ctx, comQuery, pm) +} + +func (r *repository) alarmsPage(ctx context.Context, comQuery string, pm alarms.PageMetadata) (alarms.AlarmsPage, error) { dir := api.DescDir if pm.Dir == api.AscDir { dir = api.AscDir } - orderClause := "" - + var orderClause string switch pm.Order { case api.CreatedAtOrder: orderClause = fmt.Sprintf("ORDER BY created_at %s, id %s", dir, dir) - case api.UpdatedAtOrder: - orderClause = fmt.Sprintf("ORDER BY COALESCE(updated_at, created_at) %s, id %s", dir, dir) default: orderClause = fmt.Sprintf("ORDER BY COALESCE(updated_at, created_at) %s, id %s", dir, dir) } - q := fmt.Sprintf(`SELECT id, rule_id, domain_id, channel_id, client_id, subtopic, measurement, value, unit, - threshold, cause, status, severity, assignee_id, created_at, updated_at, updated_by, assigned_at, - assigned_by, acknowledged_at, acknowledged_by, resolved_at, resolved_by, metadata - FROM alarms %s %s LIMIT :limit OFFSET :offset;`, query, orderClause) + q := fmt.Sprintf(`SELECT * FROM (%s) AS sub_query %s LIMIT :limit OFFSET :offset;`, comQuery, orderClause) + cq := fmt.Sprintf(`SELECT COUNT(*) AS total_count FROM (%s) AS sub_query;`, comQuery) rows, err := r.db.NamedQueryContext(ctx, q, pm) if err != nil { @@ -231,8 +252,7 @@ func (r *repository) ListAlarms(ctx context.Context, pm alarms.PageMetadata) (al items = append(items, a) } - q = fmt.Sprintf(`SELECT COUNT(*) FROM alarms %s;`, query) - total, err := postgres.Total(ctx, r.db, q, pm) + total, err := postgres.Total(ctx, r.db, cq, pm) if err != nil { return alarms.AlarmsPage{}, errors.Wrap(repoerr.ErrViewEntity, err) } @@ -444,49 +464,49 @@ func toAlarm(dbr dbAlarm) (alarms.Alarm, error) { func pageQuery(pm alarms.PageMetadata) (string, error) { var query []string if pm.DomainID != "" { - query = append(query, "domain_id = :domain_id") + query = append(query, "alarms.domain_id = :domain_id") } if pm.RuleID != "" { - query = append(query, "rule_id = :rule_id") + query = append(query, "alarms.rule_id = :rule_id") } if pm.ChannelID != "" { - query = append(query, "channel_id = :channel_id") + query = append(query, "alarms.channel_id = :channel_id") } if pm.Subtopic != "" { - query = append(query, "subtopic = :subtopic") + query = append(query, "alarms.subtopic = :subtopic") } if pm.ClientID != "" { - query = append(query, "client_id = :client_id") + query = append(query, "alarms.client_id = :client_id") } if pm.Measurement != "" { - query = append(query, "measurement = :measurement") + query = append(query, "alarms.measurement = :measurement") } if pm.Status != alarms.AllStatus { - query = append(query, "status = :status") + query = append(query, "alarms.status = :status") } if pm.Severity != math.MaxUint8 { - query = append(query, "severity = :severity") + query = append(query, "alarms.severity = :severity") } if pm.AssigneeID != "" { - query = append(query, "assignee_id = :assignee_id") + query = append(query, "alarms.assignee_id = :assignee_id") } if pm.UpdatedBy != "" { - query = append(query, "updated_by = :updated_by") + query = append(query, "alarms.updated_by = :updated_by") } if pm.ResolvedBy != "" { - query = append(query, "resolved_by = :resolved_by") + query = append(query, "alarms.resolved_by = :resolved_by") } if pm.AcknowledgedBy != "" { - query = append(query, "acknowledged_by = :acknowledged_by") + query = append(query, "alarms.acknowledged_by = :acknowledged_by") } if pm.AssignedBy != "" { - query = append(query, "assigned_by = :assigned_by") + query = append(query, "alarms.assigned_by = :assigned_by") } if !pm.CreatedFrom.IsZero() { - query = append(query, "created_at >= :created_from") + query = append(query, "alarms.created_at >= :created_from") } if !pm.CreatedTo.IsZero() { - query = append(query, "created_at <= :created_to") + query = append(query, "alarms.created_at <= :created_to") } var emq string diff --git a/alarms/postgres/alarms_test.go b/alarms/postgres/alarms_test.go index 16f0568fb..ec2a06060 100644 --- a/alarms/postgres/alarms_test.go +++ b/alarms/postgres/alarms_test.go @@ -403,7 +403,7 @@ func TestListAlarms(t *testing.T) { } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - alarms, err := repo.ListAlarms(context.Background(), tc.pm) + alarms, err := repo.ListAllAlarms(context.Background(), tc.pm) if tc.err != nil { assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -415,6 +415,184 @@ func TestListAlarms(t *testing.T) { } } +func TestListUserAlarms(t *testing.T) { + t.Cleanup(func() { + _, err := db.Exec("DELETE FROM alarms") + require.Nil(t, err, fmt.Sprintf("clean alarms unexpected error: %s", err)) + _, err = db.Exec("DELETE FROM rules") + require.Nil(t, err, fmt.Sprintf("clean rules unexpected error: %s", err)) + }) + + repo := postgres.NewAlarmsRepo(db) + + domainID := generateUUID(t) + userID := generateUUID(t) + otherUserID := generateUUID(t) + adminUserID := generateUUID(t) + + // Create 10 rules and 10 alarms referencing them. + // Assign userID to the first 6 rules via role membership. + var ruleIDs []string + var createdAlarms []alarms.Alarm + for i := range 10 { + ruleID := generateUUID(t) + _, err := db.Exec(`INSERT INTO rules (id, name, domain_id, status, logic_type, logic_value) VALUES ($1, $2, $3, 0, 0, '')`, + ruleID, fmt.Sprintf("rule-%d", i), domainID) + require.Nil(t, err, fmt.Sprintf("insert rule unexpected error: %s", err)) + ruleIDs = append(ruleIDs, ruleID) + + alarm := alarms.Alarm{ + ID: generateUUID(t), + RuleID: ruleID, + DomainID: domainID, + ChannelID: generateUUID(t), + ClientID: generateUUID(t), + Measurement: namegen.Generate(), + Value: namegen.Generate(), + Unit: namegen.Generate(), + Threshold: namegen.Generate(), + Cause: namegen.Generate(), + Status: 0, + AssigneeID: generateUUID(t), + CreatedAt: time.Now().UTC().Add(time.Duration(i) * time.Minute), + } + alarm, err = repo.CreateAlarm(context.Background(), alarm) + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + createdAlarms = append(createdAlarms, alarm) + } + + // Assign userID to the first 6 rules via rules_roles + rules_role_members. + userRoleIDs := make([]string, 6) + for i := range 6 { + roleID := generateUUID(t) + userRoleIDs[i] = roleID + _, err := db.Exec(`INSERT INTO rules_roles (id, name, entity_id) VALUES ($1, $2, $3)`, roleID, "admin", ruleIDs[i]) + require.Nil(t, err, fmt.Sprintf("insert rules_roles unexpected error: %s", err)) + _, err = db.Exec(`INSERT INTO rules_role_members (role_id, member_id, entity_id) VALUES ($1, $2, $3)`, roleID, userID, ruleIDs[i]) + require.Nil(t, err, fmt.Sprintf("insert rules_role_members unexpected error: %s", err)) + } + + for i := range 10 { + var roleID string + if i < 6 { + roleID = userRoleIDs[i] + } else { + roleID = generateUUID(t) + _, err := db.Exec(`INSERT INTO rules_roles (id, name, entity_id) VALUES ($1, $2, $3)`, roleID, "admin", ruleIDs[i]) + require.Nil(t, err, fmt.Sprintf("insert rules_roles unexpected error: %s", err)) + } + _, err := db.Exec(`INSERT INTO rules_role_members (role_id, member_id, entity_id) VALUES ($1, $2, $3)`, roleID, adminUserID, ruleIDs[i]) + require.Nil(t, err, fmt.Sprintf("insert rules_role_members unexpected error: %s", err)) + } + + _ = createdAlarms + + cases := []struct { + desc string + userID string + pm alarms.PageMetadata + count int + err error + }{ + { + desc: "list user alarms returns only accessible alarms", + userID: userID, + pm: alarms.PageMetadata{ + Offset: 0, + Limit: 100, + }, + count: 6, + err: nil, + }, + { + desc: "list user alarms with limit", + userID: userID, + pm: alarms.PageMetadata{ + Offset: 0, + Limit: 3, + }, + count: 3, + err: nil, + }, + { + desc: "list user alarms with offset", + userID: userID, + pm: alarms.PageMetadata{ + Offset: 4, + Limit: 100, + }, + count: 2, + err: nil, + }, + { + desc: "list user alarms with domain filter", + userID: userID, + pm: alarms.PageMetadata{ + DomainID: domainID, + Offset: 0, + Limit: 100, + }, + count: 6, + err: nil, + }, + { + desc: "list user alarms with non-existing domain returns 0", + userID: userID, + pm: alarms.PageMetadata{ + DomainID: generateUUID(t), + Offset: 0, + Limit: 100, + }, + count: 0, + err: nil, + }, + { + desc: "list alarms for user with no role assignments returns 0", + userID: otherUserID, + pm: alarms.PageMetadata{ + Offset: 0, + Limit: 100, + }, + count: 0, + err: nil, + }, + { + desc: "list alarms for admin user with role on all rules returns all alarms", + userID: adminUserID, + pm: alarms.PageMetadata{ + Offset: 0, + Limit: 100, + }, + count: 10, + err: nil, + }, + { + desc: "list user alarms ordered by created_at ascending", + userID: userID, + pm: alarms.PageMetadata{ + Offset: 0, + Limit: 100, + Order: "created_at", + Dir: "asc", + }, + count: 6, + err: nil, + }, + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + page, err := repo.ListUserAlarms(context.Background(), tc.userID, tc.pm) + if tc.err != nil { + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) + return + } + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + assert.Equal(t, tc.count, len(page.Alarms), fmt.Sprintf("%s: expected %d alarms, got %d", tc.desc, tc.count, len(page.Alarms))) + }) + } +} + func TestDeleteAlarm(t *testing.T) { t.Cleanup(func() { _, err := db.Exec("DELETE FROM alarms") diff --git a/alarms/postgres/init.go b/alarms/postgres/init.go index e25b97b60..631fcdab2 100644 --- a/alarms/postgres/init.go +++ b/alarms/postgres/init.go @@ -4,13 +4,16 @@ package postgres import ( + rpostgres "github.com/absmach/magistrala/re/postgres" + "github.com/absmach/supermq/pkg/errors" + repoerr "github.com/absmach/supermq/pkg/errors/repository" _ "github.com/jackc/pgx/v5/stdlib" // required for SQL access migrate "github.com/rubenv/sql-migrate" ) -// Migration of Users service. -func Migration() *migrate.MemoryMigrationSource { - return &migrate.MemoryMigrationSource{ +// Migration of Alarms service. +func Migration() (*migrate.MemoryMigrationSource, error) { + alarmsMigration := &migrate.MemoryMigrationSource{ Migrations: []*migrate.Migration{ { Id: "alarms_01", @@ -50,4 +53,13 @@ func Migration() *migrate.MemoryMigrationSource { }, }, } + + rulesMigration, err := rpostgres.Migration() + if err != nil { + return &migrate.MemoryMigrationSource{}, errors.Wrap(repoerr.ErrRoleMigration, err) + } + + alarmsMigration.Migrations = append(alarmsMigration.Migrations, rulesMigration.Migrations...) + + return alarmsMigration, nil } diff --git a/alarms/postgres/setup_test.go b/alarms/postgres/setup_test.go index 3452d75ef..990bf13ac 100644 --- a/alarms/postgres/setup_test.go +++ b/alarms/postgres/setup_test.go @@ -75,7 +75,11 @@ func TestMain(m *testing.M) { SSLRootCert: "", } - if db, err = postgres.Setup(dbConfig, *apostgres.Migration()); err != nil { + migration, err := apostgres.Migration() + if err != nil { + log.Fatalf("Could not get migration: %s", err) + } + if db, err = postgres.Setup(dbConfig, *migration); err != nil { log.Fatalf("Could not setup test DB connection: %s", err) } diff --git a/alarms/service.go b/alarms/service.go index 2ae4f8c80..75c44abed 100644 --- a/alarms/service.go +++ b/alarms/service.go @@ -43,6 +43,7 @@ func (s *service) CreateAlarm(ctx context.Context, alarm Alarm) error { if _, err = s.repo.CreateAlarm(ctx, alarm); err != nil && err != repoerr.ErrNotFound { return err } + return nil } @@ -51,7 +52,10 @@ func (s *service) ViewAlarm(ctx context.Context, session authn.Session, alarmID } func (s *service) ListAlarms(ctx context.Context, session authn.Session, pm PageMetadata) (AlarmsPage, error) { - return s.repo.ListAlarms(ctx, pm) + if session.SuperAdmin { + return s.repo.ListAllAlarms(ctx, pm) + } + return s.repo.ListUserAlarms(ctx, session.UserID, pm) } func (s *service) DeleteAlarm(ctx context.Context, session authn.Session, alarmID string) error { diff --git a/alarms/service_test.go b/alarms/service_test.go index a4ab2e285..700072620 100644 --- a/alarms/service_test.go +++ b/alarms/service_test.go @@ -6,7 +6,6 @@ package alarms_test import ( "context" "fmt" - "math" "testing" "time" @@ -22,9 +21,13 @@ import ( var idp = uuid.New() +func newService(t *testing.T, repo *mocks.Repository) alarms.Service { + return alarms.NewService(idp, repo) +} + func TestCreateAlarm(t *testing.T) { repo := new(mocks.Repository) - svc := alarms.NewService(idp, repo) + svc := newService(t, repo) ts := time.Now() cases := []struct { desc string @@ -69,33 +72,16 @@ func TestCreateAlarm(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { repoCall := repo.On("CreateAlarm", context.Background(), mock.Anything).Return(tc.alarm, tc.err) - repoCall1 := repo.On("ListAlarms", context.Background(), alarms.PageMetadata{ - Offset: 0, Limit: 1, - DomainID: tc.alarm.DomainID, - ChannelID: tc.alarm.ChannelID, - ClientID: tc.alarm.ClientID, - Subtopic: tc.alarm.Subtopic, - Measurement: tc.alarm.Measurement, - RuleID: tc.alarm.RuleID, - Status: alarms.AllStatus, - Severity: math.MaxUint8, - CreatedTo: tc.alarm.CreatedAt, - }).Return(alarms.AlarmsPage{}, tc.err) err := svc.CreateAlarm(context.Background(), tc.alarm) - if tc.err != nil { - assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) - - return - } + assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) repoCall.Unset() - repoCall1.Unset() }) } } func TestViewAlarm(t *testing.T) { repo := new(mocks.Repository) - svc := alarms.NewService(idp, repo) + svc := newService(t, repo) cases := []struct { desc string @@ -134,7 +120,7 @@ func TestViewAlarm(t *testing.T) { func TestUpdateAlarm(t *testing.T) { repo := new(mocks.Repository) - svc := alarms.NewService(idp, repo) + svc := newService(t, repo) cases := []struct { desc string @@ -192,7 +178,7 @@ func TestUpdateAlarm(t *testing.T) { func TestListAlarms(t *testing.T) { repo := new(mocks.Repository) - svc := alarms.NewService(idp, repo) + svc := newService(t, repo) cases := []struct { desc string @@ -219,7 +205,7 @@ func TestListAlarms(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { s := authn.Session{DomainID: tc.pm.DomainID} - repoCall := repo.On("ListAlarms", context.Background(), tc.pm).Return(tc.page, tc.err) + repoCall := repo.On("ListUserAlarms", context.Background(), s.UserID, tc.pm).Return(tc.page, tc.err) _, err := svc.ListAlarms(context.Background(), s, tc.pm) if tc.err != nil { assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) @@ -233,7 +219,7 @@ func TestListAlarms(t *testing.T) { func TestDeleteAlarm(t *testing.T) { repo := new(mocks.Repository) - svc := alarms.NewService(idp, repo) + svc := newService(t, repo) cases := []struct { desc string diff --git a/cmd/alarms/main.go b/cmd/alarms/main.go index a213689b0..8ccd9690b 100644 --- a/cmd/alarms/main.go +++ b/cmd/alarms/main.go @@ -15,17 +15,23 @@ import ( "github.com/absmach/magistrala/alarms/brokers" "github.com/absmach/magistrala/alarms/consumer" "github.com/absmach/magistrala/alarms/middleware" + "github.com/absmach/magistrala/alarms/operations" alarmsRepo "github.com/absmach/magistrala/alarms/postgres" "github.com/absmach/magistrala/pkg/prometheus" + rconsumer "github.com/absmach/magistrala/pkg/re/events/consumer" + rpostgres "github.com/absmach/magistrala/re/postgres" + dpostgres "github.com/absmach/supermq/domains/postgres" smqlog "github.com/absmach/supermq/logger" smqauthn "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/authn/authsvc" authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc" + dconsumer "github.com/absmach/supermq/pkg/domains/events/consumer" domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient" "github.com/absmach/supermq/pkg/grpcclient" "github.com/absmach/supermq/pkg/jaeger" "github.com/absmach/supermq/pkg/messaging" brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" + "github.com/absmach/supermq/pkg/permissions" "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/server" httpserver "github.com/absmach/supermq/pkg/server/http" @@ -42,14 +48,18 @@ const ( defDB = "alarms" defSvcHTTPPort = "8050" envPrefixDomains = "SMQ_DOMAINS_GRPC_" + alarmEntity = "alarm" ) type config struct { - LogLevel string `env:"MG_ALARMS_LOG_LEVEL" envDefault:"info"` - BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` - InstanceID string `env:"MG_ALARMS_INSTANCE_ID" envDefault:""` - JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` - TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` + LogLevel string `env:"MG_ALARMS_LOG_LEVEL" envDefault:"info"` + BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` + InstanceID string `env:"MG_ALARMS_INSTANCE_ID" envDefault:""` + JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` + TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` + ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESConsumerName string `env:"MG_ALARMS_EVENT_CONSUMER" envDefault:"alarms"` + PermissionsFile string `env:"SMQ_PERMISSIONS_FILE" envDefault:"permission.yaml"` } func main() { @@ -87,7 +97,14 @@ func main() { logger.Error(err.Error()) } - db, err := postgres.Setup(dbConfig, *alarmsRepo.Migration()) + migrations, err := alarmsRepo.Migration() + if err != nil { + logger.Error(fmt.Sprintf("failed to load migrations: %s", err)) + exitCode = 1 + return + } + + db, err := postgres.Setup(dbConfig, *migrations) if err != nil { logger.Error(err.Error()) exitCode = 1 @@ -138,11 +155,63 @@ func main() { logger.Info("AuthZ successfully connected to auth gRPC server " + authzHandler.Secure()) + ddatabase := postgres.NewDatabase(db, dbConfig, tracer) + drepo := dpostgres.NewRepository(ddatabase) + + if err := dconsumer.DomainsEventsSubscribe(ctx, drepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { + logger.Error(fmt.Sprintf("failed to create domains event store : %s", err)) + exitCode = 1 + return + } + + rdatabase := postgres.NewDatabase(db, dbConfig, tracer) + rrepo := rpostgres.NewRepository(rdatabase) + + if err := rconsumer.RulesEventsSubscribe(ctx, rrepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { + logger.Error(fmt.Sprintf("failed to subscribe to rules events: %s", err)) + exitCode = 1 + return + } + idp := uuid.New() svc := alarms.NewService(idp, repo) - svc = middleware.NewAuthorizationMiddleware(svc, authz) + permConfig, err := permissions.ParsePermissionsFile(cfg.PermissionsFile) + if err != nil { + logger.Error(fmt.Sprintf("failed to parse permissions file: %s", err)) + exitCode = 1 + return + } + + alarmOps, _, err := permConfig.GetEntityPermissions(alarmEntity) + if err != nil { + logger.Error(fmt.Sprintf("failed to get alarm permissions: %s", err)) + exitCode = 1 + return + } + + entitiesOps, err := permissions.NewEntitiesOperations( + permissions.EntitiesPermission{ + operations.EntityType: alarmOps, + }, + permissions.EntitiesOperationDetails[permissions.Operation]{ + operations.EntityType: operations.OperationDetails(), + }, + ) + if err != nil { + logger.Error(fmt.Sprintf("failed to create entity operations: %s", err)) + exitCode = 1 + return + } + + svc, err = middleware.NewAuthorizationMiddleware(svc, authz, entitiesOps) + if err != nil { + logger.Error(fmt.Sprintf("failed to create authorization middleware: %s", err)) + exitCode = 1 + return + } + svc = middleware.NewLoggingMiddleware(logger, svc) counter, latency := prometheus.MakeMetrics("alarms", "api") svc = middleware.NewMetricsMiddleware(counter, latency, svc) diff --git a/docker/.env b/docker/.env index 457f138f2..d97bbdd7f 100644 --- a/docker/.env +++ b/docker/.env @@ -52,6 +52,8 @@ SMQ_AUTH_GRPC_TIMEOUT=300s SMQ_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/auth-grpc-client.crt} SMQ_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/auth-grpc-client.key} SMQ_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt} +SMQ_AUTH_ACCESS_TOKEN_DURATION=1h +SMQ_AUTH_REFRESH_TOKEN_DURATION=24h #### Clients Client Config SMQ_CLIENTS_URL=http://clients:9006 @@ -148,6 +150,7 @@ MG_ALARMS_DB_SSL_CERT= MG_ALARMS_DB_SSL_KEY= MG_ALARMS_DB_SSL_ROOT_CERT= MG_ALARMS_INSTANCE_ID= +MG_ALARMS_EVENT_CONSUMER=alarms ### REPORTS MG_REPORTS_LOG_LEVEL=debug @@ -416,6 +419,7 @@ MG_BACKEND_OBJECT_STORAGE_ACCESS_KEY=localKey MG_BACKEND_OBJECT_STORAGE_SECRET_KEY=localSecret MG_BACKEND_OBJECT_STORAGE_WRITE_TTL=1m MG_BACKEND_OBJECT_STORAGE_READ_TTL=15m +MG_BACKEND_OBJECT_STORAGE_TTL=15m #### Auth GRPC Client Config MG_AUTH_GRPC_URL=auth:7001 @@ -493,4 +497,4 @@ MG_RELEASE_TAG=latest SMQ_ALLOW_UNVERIFIED_USER=true # Set to yes to accept the EULA for the UI services. To view the EULA visit: https://github.com/absmach/eula -MG_UI_DOCKER_ACCEPT_EULA=yes +MG_UI_DOCKER_ACCEPT_EULA=no diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 7b8a9d538..ac1bd18d4 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -397,6 +397,7 @@ services: MG_ALARMS_DB_SSL_KEY: ${MG_ALARMS_DB_SSL_KEY} MG_ALARMS_DB_SSL_ROOT_CERT: ${MG_ALARMS_DB_SSL_ROOT_CERT} SMQ_MESSAGE_BROKER_URL: ${SMQ_MESSAGE_BROKER_URL} + SMQ_ES_URL: ${SMQ_ES_URL} SMQ_JAEGER_URL: ${SMQ_JAEGER_URL} SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO} SMQ_AUTH_GRPC_URL: ${SMQ_AUTH_GRPC_URL} @@ -415,6 +416,7 @@ services: SMQ_SPICEDB_SCHEMA_FILE: ${SMQ_SPICEDB_SCHEMA_FILE} SMQ_PERMISSIONS_FILE: ${SMQ_PERMISSIONS_FILE} MG_ALARMS_INSTANCE_ID: ${MG_ALARMS_INSTANCE_ID} + MG_ALARMS_EVENT_CONSUMER: ${MG_ALARMS_EVENT_CONSUMER} SMQ_ALLOW_UNVERIFIED_USER: ${SMQ_ALLOW_UNVERIFIED_USER} ports: - ${MG_ALARMS_HTTP_PORT}:${MG_ALARMS_HTTP_PORT} diff --git a/docker/permission.yaml b/docker/permission.yaml index e14de2987..039bac3f4 100644 --- a/docker/permission.yaml +++ b/docker/permission.yaml @@ -3,13 +3,13 @@ alarm: operations: - - add: alarm_create_permission - list: alarm_read_permission - - view: read_permission - - update: update_permission - - enable: update_permission - - disable: update_permission - - delete: delete_permission + - view: alarm_read_permission + - update: alarm_update_permission + - delete: alarm_delete_permission + - assign: alarm_assign_permission + - acknowledge: alarm_acknowledge_permission + - resolve: alarm_resolve_permission rule: operations: diff --git a/docker/spicedb/combined-schema.zed b/docker/spicedb/combined-schema.zed index 6820f6986..edf8b7526 100644 --- a/docker/spicedb/combined-schema.zed +++ b/docker/spicedb/combined-schema.zed @@ -308,7 +308,6 @@ definition domain { relation group_view_role_users: role#member | team#member // Magistrala-specific relations - relation alarm_create: role#member | team#member relation alarm_update: role#member | team#member relation alarm_read: role#member | team#member relation alarm_delete: role#member | team#member @@ -320,9 +319,9 @@ definition domain { relation rule_add_role_users: role#member | team#member relation rule_remove_role_users: role#member | team#member relation rule_view_role_users: role#member | team#member - relation rule_alarm_assign: role#member | team#member - relation rule_alarm_acknowledge: role#member | team#member - relation rule_alarm_resolve: role#member | team#member + relation alarm_assign: role#member | team#member + relation alarm_acknowledge: role#member | team#member + relation alarm_resolve: role#member | team#member relation report_create: role#member | team#member relation report_update: role#member | team#member relation report_read: role#member | team#member @@ -352,7 +351,7 @@ definition domain { channel_manage_role + channel_add_role_users + channel_remove_role_users + channel_view_role_users + group_update + group_membership + group_read + group_delete + group_set_child + group_set_parent + group_manage_role + group_add_role_users + group_remove_role_users + group_view_role_users + - alarm_create + alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + rule_alarm_assign + rule_alarm_acknowledge + rule_alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users + + alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + alarm_assign + alarm_acknowledge + alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users + organization->admin permission admin = (read & update & enable & disable & delete & manage_role & add_role_users & remove_role_users & view_role_users) + organization->admin @@ -398,7 +397,6 @@ definition domain { permission group_view_role_users_permission = group_view_role_users + team->group_view_role_users + organization->admin // Magistrala-specific permissions - permission alarm_create_permission = alarm_create + team->alarm_create + organization->admin permission alarm_update_permission = alarm_update + team->alarm_update + organization->admin permission alarm_read_permission = alarm_read + team->alarm_read + organization->admin permission alarm_delete_permission = alarm_delete + team->alarm_delete + organization->admin @@ -410,9 +408,9 @@ definition domain { permission rule_add_role_users_permission = rule_add_role_users + team->rule_add_role_users + organization->admin permission rule_remove_role_users_permission = rule_remove_role_users + team->rule_remove_role_users + organization->admin permission rule_view_role_users_permission = rule_view_role_users + team->rule_view_role_users + organization->admin - permission rule_alarm_assign_permission = rule_alarm_assign + team->rule_alarm_assign + organization->admin - permission rule_alarm_acknowledge_permission = rule_alarm_acknowledge + team->rule_alarm_acknowledge + organization->admin - permission rule_alarm_resolve_permission = rule_alarm_resolve + team->rule_alarm_resolve + organization->admin + permission alarm_assign_permission = alarm_assign + team->alarm_assign + organization->admin + permission alarm_acknowledge_permission = alarm_acknowledge + team->alarm_acknowledge + organization->admin + permission alarm_resolve_permission = alarm_resolve + team->alarm_resolve + organization->admin permission report_create_permission = report_create + team->report_create + organization->admin permission report_update_permission = report_update + team->report_update + organization->admin permission report_read_permission = report_read + team->report_read + organization->admin @@ -512,7 +510,6 @@ definition team { relation group_view_role_users: role#member | team#member // Magistrala-specific relations - relation alarm_create: role#member | team#member relation alarm_update: role#member | team#member relation alarm_read: role#member | team#member relation alarm_delete: role#member | team#member @@ -524,9 +521,9 @@ definition team { relation rule_add_role_users: role#member | team#member relation rule_remove_role_users: role#member | team#member relation rule_view_role_users: role#member | team#member - relation rule_alarm_assign: role#member | team#member - relation rule_alarm_acknowledge: role#member | team#member - relation rule_alarm_resolve: role#member | team#member + relation alarm_assign: role#member | team#member + relation alarm_acknowledge: role#member | team#member + relation alarm_resolve: role#member | team#member relation report_create: role#member | team#member relation report_update: role#member | team#member relation report_read: role#member | team#member @@ -636,18 +633,6 @@ definition platform { // Overlay team block consumed by scripts/combine-schema.sh during merge. -definition alarm { -relation domain: domain - -relation update: role#member -relation read: role#member -relation delete: role#member - -permission update_permission = update + domain->alarm_update_permission -permission read_permission = read + domain->alarm_read_permission -permission delete_permission = delete + domain->alarm_delete_permission -} - definition rule { relation domain: domain @@ -660,6 +645,7 @@ relation add_role_users: role#member relation remove_role_users: role#member relation view_role_users: role#member +relation alarm_read: role#member relation alarm_assign: role#member relation alarm_acknowledge: role#member relation alarm_resolve: role#member @@ -673,9 +659,10 @@ permission add_role_users_permission = add_role_users + domain->rule_add_role_us permission remove_role_users_permission = remove_role_users + domain->rule_remove_role_users_permission permission view_role_users_permission = view_role_users + domain->rule_view_role_users_permission -permission alarm_assign_permission = alarm_assign + domain->rule_alarm_assign_permission -permission alarm_acknowledge_permission = alarm_acknowledge + domain->rule_alarm_acknowledge_permission -permission alarm_resolve_permission = alarm_resolve + domain->rule_alarm_resolve_permission +permission alarm_read_permission = alarm_read + domain->alarm_read_permission +permission alarm_assign_permission = alarm_assign + domain->alarm_assign_permission +permission alarm_acknowledge_permission = alarm_acknowledge + domain->alarm_acknowledge_permission +permission alarm_resolve_permission = alarm_resolve + domain->alarm_resolve_permission } definition report { diff --git a/docker/spicedb/override-schema.zed b/docker/spicedb/override-schema.zed index 3fa09692c..71db4a871 100644 --- a/docker/spicedb/override-schema.zed +++ b/docker/spicedb/override-schema.zed @@ -28,7 +28,6 @@ definition domain { // Magistrala-specific relations - relation alarm_create: role#member | team#member relation alarm_update: role#member | team#member relation alarm_read: role#member | team#member relation alarm_delete: role#member | team#member @@ -41,9 +40,9 @@ definition domain { relation rule_add_role_users: role#member | team#member relation rule_remove_role_users: role#member | team#member relation rule_view_role_users: role#member | team#member - relation rule_alarm_assign: role#member | team#member - relation rule_alarm_acknowledge: role#member | team#member - relation rule_alarm_resolve: role#member | team#member + relation alarm_assign: role#member | team#member + relation alarm_acknowledge: role#member | team#member + relation alarm_resolve: role#member | team#member relation report_create: role#member | team#member relation report_update: role#member | team#member @@ -55,7 +54,6 @@ definition domain { relation report_view_role_users: role#member | team#member // Magistrala-specific permissions - permission alarm_create_permission = alarm_create + team->alarm_create + organization->admin permission alarm_update_permission = alarm_update + team->alarm_update + organization->admin permission alarm_read_permission = alarm_read + team->alarm_read + organization->admin permission alarm_delete_permission = alarm_delete + team->alarm_delete + organization->admin @@ -68,9 +66,9 @@ definition domain { permission rule_add_role_users_permission = rule_add_role_users + team->rule_add_role_users + organization->admin permission rule_remove_role_users_permission = rule_remove_role_users + team->rule_remove_role_users + organization->admin permission rule_view_role_users_permission = rule_view_role_users + team->rule_view_role_users + organization->admin - permission rule_alarm_assign_permission = rule_alarm_assign + team->rule_alarm_assign + organization->admin - permission rule_alarm_acknowledge_permission = rule_alarm_acknowledge + team->rule_alarm_acknowledge + organization->admin - permission rule_alarm_resolve_permission = rule_alarm_resolve + team->rule_alarm_resolve + organization->admin + permission alarm_assign_permission = alarm_assign + team->alarm_assign + organization->admin + permission alarm_acknowledge_permission = alarm_acknowledge + team->alarm_acknowledge + organization->admin + permission alarm_resolve_permission = alarm_resolve + team->alarm_resolve + organization->admin permission report_create_permission = report_create + team->report_create + organization->admin permission report_update_permission = report_update + team->report_update + organization->admin @@ -82,14 +80,13 @@ definition domain { permission report_view_role_users_permission = report_view_role_users + team->report_view_role_users + organization->admin // Explicit extension injected into SuperMQ domain `permission membership`. - permission membership_extension = alarm_create + alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + rule_alarm_assign + rule_alarm_acknowledge + rule_alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users + permission membership_extension = alarm_update + alarm_read + alarm_delete + rule_create + rule_update + rule_read + rule_delete + rule_manage_role + rule_add_role_users + rule_remove_role_users + rule_view_role_users + alarm_assign + alarm_acknowledge + alarm_resolve + report_create + report_update + report_read + report_delete + report_manage_role + report_add_role_users + report_remove_role_users + report_view_role_users } // Overlay team block consumed by scripts/combine-schema.sh during merge. definition team { - relation alarm_create: role#member | team#member relation alarm_update: role#member | team#member relation alarm_read: role#member | team#member relation alarm_delete: role#member | team#member @@ -102,9 +99,9 @@ definition team { relation rule_add_role_users: role#member | team#member relation rule_remove_role_users: role#member | team#member relation rule_view_role_users: role#member | team#member - relation rule_alarm_assign: role#member | team#member - relation rule_alarm_acknowledge: role#member | team#member - relation rule_alarm_resolve: role#member | team#member + relation alarm_assign: role#member | team#member + relation alarm_acknowledge: role#member | team#member + relation alarm_resolve: role#member | team#member relation report_create: role#member | team#member relation report_update: role#member | team#member @@ -114,19 +111,6 @@ definition team { relation report_add_role_users: role#member | team#member relation report_remove_role_users: role#member | team#member relation report_view_role_users: role#member | team#member - -} - -definition alarm { -relation domain: domain - -relation update: role#member -relation read: role#member -relation delete: role#member - -permission update_permission = update + domain->alarm_update_permission -permission read_permission = read + domain->alarm_read_permission -permission delete_permission = delete + domain->alarm_delete_permission } definition rule { @@ -141,6 +125,7 @@ relation add_role_users: role#member relation remove_role_users: role#member relation view_role_users: role#member +relation alarm_read: role#member relation alarm_assign: role#member relation alarm_acknowledge: role#member relation alarm_resolve: role#member @@ -154,9 +139,10 @@ permission add_role_users_permission = add_role_users + domain->rule_add_role_us permission remove_role_users_permission = remove_role_users + domain->rule_remove_role_users_permission permission view_role_users_permission = view_role_users + domain->rule_view_role_users_permission -permission alarm_assign_permission = alarm_assign + domain->rule_alarm_assign_permission -permission alarm_acknowledge_permission = alarm_acknowledge + domain->rule_alarm_acknowledge_permission -permission alarm_resolve_permission = alarm_resolve + domain->rule_alarm_resolve_permission +permission alarm_read_permission = alarm_read + domain->alarm_read_permission +permission alarm_assign_permission = alarm_assign + domain->alarm_assign_permission +permission alarm_acknowledge_permission = alarm_acknowledge + domain->alarm_acknowledge_permission +permission alarm_resolve_permission = alarm_resolve + domain->alarm_resolve_permission } definition report { diff --git a/docker/supermq-docker/.env b/docker/supermq-docker/.env index 9ae2b78d6..002536e03 100644 --- a/docker/supermq-docker/.env +++ b/docker/supermq-docker/.env @@ -238,8 +238,6 @@ SMQ_USERS_ADMIN_USERNAME=admin SMQ_USERS_ADMIN_FIRST_NAME=super SMQ_USERS_ADMIN_LAST_NAME=admin SMQ_USERS_PASS_REGEX=^.{8,}$ -SMQ_USERS_ACCESS_TOKEN_DURATION=15m -SMQ_USERS_REFRESH_TOKEN_DURATION=24h SMQ_USERS_HTTP_HOST=users SMQ_USERS_HTTP_PORT=9002 SMQ_USERS_HTTP_SERVER_CERT= @@ -263,8 +261,6 @@ SMQ_USERS_SECRET_KEY=HyE2D4RUt9nnKG6v8zKEqAp6g6ka8hhZsqUpzgKvnwpXrNVQSH SMQ_USERS_ADMIN_EMAIL=admin@example.com SMQ_USERS_ADMIN_PASSWORD=12345678 SMQ_USERS_PASS_REGEX=^.{8,}$ -SMQ_USERS_ACCESS_TOKEN_DURATION=15m -SMQ_USERS_REFRESH_TOKEN_DURATION=24h SMQ_USERS_ALLOW_SELF_REGISTER=true SMQ_OAUTH_UI_REDIRECT_URL=http://localhost:9095${SMQ_UI_PATH_PREFIX}/tokens/secure SMQ_OAUTH_UI_ERROR_URL=http://localhost:9095${SMQ_UI_PATH_PREFIX}/error diff --git a/docker/supermq-docker/Dockerfile b/docker/supermq-docker/Dockerfile index 7d292a60b..985918d8b 100644 --- a/docker/supermq-docker/Dockerfile +++ b/docker/supermq-docker/Dockerfile @@ -1,7 +1,7 @@ # Copyright (c) Abstract Machines # SPDX-License-Identifier: Apache-2.0 -FROM golang:1.26.0-alpine3.22 AS builder +FROM golang:1.26.1-alpine3.22 AS builder ARG SVC ARG GOARCH ARG GOARM diff --git a/docker/supermq-docker/docker-compose.yaml b/docker/supermq-docker/docker-compose.yaml index dc82038fb..8b1bf2fd5 100644 --- a/docker/supermq-docker/docker-compose.yaml +++ b/docker/supermq-docker/docker-compose.yaml @@ -862,8 +862,6 @@ services: SMQ_USERS_ADMIN_FIRST_NAME: ${SMQ_USERS_ADMIN_FIRST_NAME} SMQ_USERS_ADMIN_LAST_NAME: ${SMQ_USERS_ADMIN_LAST_NAME} SMQ_USERS_PASS_REGEX: ${SMQ_USERS_PASS_REGEX} - SMQ_USERS_ACCESS_TOKEN_DURATION: ${SMQ_USERS_ACCESS_TOKEN_DURATION} - SMQ_USERS_REFRESH_TOKEN_DURATION: ${SMQ_USERS_REFRESH_TOKEN_DURATION} SMQ_USERS_HTTP_HOST: ${SMQ_USERS_HTTP_HOST} SMQ_USERS_HTTP_PORT: ${SMQ_USERS_HTTP_PORT} SMQ_USERS_HTTP_SERVER_CERT: ${SMQ_USERS_HTTP_SERVER_CERT} diff --git a/go.mod b/go.mod index 1201125eb..9d68604f7 100644 --- a/go.mod +++ b/go.mod @@ -52,9 +52,6 @@ require ( github.com/authzed/cel-go v0.20.2 // indirect github.com/authzed/spicedb v1.49.2 // indirect github.com/ccoveille/go-safecast/v2 v2.0.0 // indirect - github.com/containerd/errdefs v1.0.0 // indirect - github.com/containerd/errdefs/pkg v0.3.0 // indirect - github.com/distribution/reference v0.6.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/go-errors/errors v1.5.1 // indirect github.com/go-logr/zerologr v1.2.3 // indirect @@ -67,6 +64,12 @@ require ( sigs.k8s.io/controller-runtime v0.22.4 // indirect ) +require ( + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect + github.com/distribution/reference v0.6.0 // indirect +) + require ( buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.11-20251209175733-2a1774d88802.1 // indirect dario.cat/mergo v1.0.2 // indirect diff --git a/pkg/policies/evaluator.go b/pkg/policies/evaluator.go index 8b235669c..2d33717c0 100644 --- a/pkg/policies/evaluator.go +++ b/pkg/policies/evaluator.go @@ -6,4 +6,5 @@ package policies const ( RulesType = "rules" ReportsType = "reports" + AlarmsType = "alarms" ) diff --git a/pkg/re/events/consumer/decode.go b/pkg/re/events/consumer/decode.go new file mode 100644 index 000000000..46210ed7a --- /dev/null +++ b/pkg/re/events/consumer/decode.go @@ -0,0 +1,204 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package consumer + +import ( + "encoding/json" + "time" + + "github.com/absmach/magistrala/pkg/schedule" + "github.com/absmach/magistrala/re" + "github.com/absmach/supermq/pkg/errors" + "github.com/absmach/supermq/pkg/roles" + rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer" +) + +var ( + errDecodeAddRuleEvent = errors.New("failed to decode rule add event") + errDecodeUpdateRuleEvent = errors.New("failed to decode rule update event") + errDecodeUpdateRuleTagsEvent = errors.New("failed to decode rule update tags event") + errDecodeUpdateRuleScheduleEvent = errors.New("failed to decode rule update schedule event") + errDecodeEnableRuleEvent = errors.New("failed to decode rule enable event") + errDecodeDisableRuleEvent = errors.New("failed to decode rule disable event") + errDecodeRemoveRuleEvent = errors.New("failed to decode rule remove event") + + errID = errors.New("missing or invalid 'id'") + errName = errors.New("missing or invalid 'name'") + errTags = errors.New("invalid 'tags'") + errStatus = errors.New("missing or invalid 'status'") + errConvertStatus = errors.New("failed to convert status") + errCreatedBy = errors.New("missing or invalid 'created_by'") + errCreatedAt = errors.New("failed to parse 'created_at' time") + errUpdatedAt = errors.New("failed to parse 'updated_at' time") + errDecodeLogic = errors.New("failed to decode 'logic'") + errDecodeSchedule = errors.New("failed to decode 'schedule'") +) + +// ToRule decodes a map[string]any event payload into a re.Rule. +func ToRule(data map[string]any) (re.Rule, error) { + var r re.Rule + + id, ok := data["id"].(string) + if !ok { + return re.Rule{}, errID + } + r.ID = id + + name, ok := data["name"].(string) + if !ok { + return re.Rule{}, errName + } + r.Name = name + + stat, ok := data["status"].(string) + if !ok { + return re.Rule{}, errStatus + } + st, err := re.ToStatus(stat) + if err != nil { + return re.Rule{}, errors.Wrap(errConvertStatus, err) + } + r.Status = st + + cby, ok := data["created_by"].(string) + if !ok { + return re.Rule{}, errCreatedBy + } + r.CreatedBy = cby + + cat, ok := data["created_at"].(string) + if !ok { + return re.Rule{}, errCreatedAt + } + ct, err := time.Parse(re.TimeLayout, cat) + if err != nil { + return re.Rule{}, errors.Wrap(errCreatedAt, err) + } + r.CreatedAt = ct + + if domain, ok := data["domain"].(string); ok { + r.DomainID = domain + } + + if itags, ok := data["tags"].([]any); ok { + tags, err := rconsumer.ToStrings(itags) + if err != nil { + return re.Rule{}, errors.Wrap(errTags, err) + } + r.Tags = tags + } + + if meta, ok := data["metadata"].(map[string]any); ok { + r.Metadata = meta + } + + if uby, ok := data["updated_by"].(string); ok { + r.UpdatedBy = uby + } + + if uat, ok := data["updated_at"].(string); ok { + ut, err := time.Parse(re.TimeLayout, uat) + if err != nil { + return re.Rule{}, errors.Wrap(errUpdatedAt, err) + } + r.UpdatedAt = ut + } + + if ic, ok := data["input_channel"].(string); ok { + r.InputChannel = ic + } + + if it, ok := data["input_topic"].(string); ok { + r.InputTopic = it + } + + if rawLogic, ok := data["logic"].(map[string]any); ok { + b, err := json.Marshal(rawLogic) + if err != nil { + return re.Rule{}, errors.Wrap(errDecodeLogic, err) + } + if err := json.Unmarshal(b, &r.Logic); err != nil { + return re.Rule{}, errors.Wrap(errDecodeLogic, err) + } + } + + if rawSched, ok := data["schedule"].(map[string]any); ok { + b, err := json.Marshal(rawSched) + if err != nil { + return re.Rule{}, errors.Wrap(errDecodeSchedule, err) + } + var sched schedule.Schedule + if err := json.Unmarshal(b, &sched); err != nil { + return re.Rule{}, errors.Wrap(errDecodeSchedule, err) + } + r.Schedule = sched + } + + return r, nil +} + +func decodeAddRuleEvent(data map[string]any) (re.Rule, []roles.RoleProvision, error) { + r, err := ToRule(data) + if err != nil { + return re.Rule{}, nil, errors.Wrap(errDecodeAddRuleEvent, err) + } + + var rps []roles.RoleProvision + if irps, ok := data["roles_provisioned"].([]any); ok { + rps, err = rconsumer.ToRoleProvisions(irps) + if err != nil { + return re.Rule{}, nil, errors.Wrap(errDecodeAddRuleEvent, err) + } + } + + return r, rps, nil +} + +func decodeUpdateRuleEvent(data map[string]any) (re.Rule, error) { + r, err := ToRule(data) + if err != nil { + return re.Rule{}, errors.Wrap(errDecodeUpdateRuleEvent, err) + } + return r, nil +} + +func decodeUpdateRuleTagsEvent(data map[string]any) (re.Rule, error) { + r, err := ToRule(data) + if err != nil { + return re.Rule{}, errors.Wrap(errDecodeUpdateRuleTagsEvent, err) + } + return r, nil +} + +func decodeUpdateRuleScheduleEvent(data map[string]any) (re.Rule, error) { + r, err := ToRule(data) + if err != nil { + return re.Rule{}, errors.Wrap(errDecodeUpdateRuleScheduleEvent, err) + } + return r, nil +} + +func decodeEnableRuleEvent(data map[string]any) (re.Rule, error) { + r, err := ToRule(data) + if err != nil { + return re.Rule{}, errors.Wrap(errDecodeEnableRuleEvent, err) + } + return r, nil +} + +func decodeDisableRuleEvent(data map[string]any) (re.Rule, error) { + r, err := ToRule(data) + if err != nil { + return re.Rule{}, errors.Wrap(errDecodeDisableRuleEvent, err) + } + return r, nil +} + +func decodeRemoveRuleEvent(data map[string]any) (string, error) { + id, ok := data["id"].(string) + if !ok { + return "", errors.Wrap(errDecodeRemoveRuleEvent, errID) + } + return id, nil +} diff --git a/pkg/re/events/consumer/doc.go b/pkg/re/events/consumer/doc.go new file mode 100644 index 000000000..581353cb3 --- /dev/null +++ b/pkg/re/events/consumer/doc.go @@ -0,0 +1,6 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +// Package consumer contains events consumer for events +// published by the Rules Engine service. +package consumer diff --git a/pkg/re/events/consumer/stream.go b/pkg/re/events/consumer/stream.go new file mode 100644 index 000000000..b6af4f16e --- /dev/null +++ b/pkg/re/events/consumer/stream.go @@ -0,0 +1,193 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package consumer + +import ( + "context" + "log/slog" + + "github.com/absmach/magistrala/re" + "github.com/absmach/supermq/pkg/errors" + "github.com/absmach/supermq/pkg/events" + "github.com/absmach/supermq/pkg/events/store" + rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer" +) + +const ( + stream = "events.supermq.rule.*" + + create = "rule.create" + update = "rule.update" + updateTags = "rule.update_tags" + updateSchedule = "rule.update_schedule" + enable = "rule.enable" + disable = "rule.disable" + remove = "rule.remove" +) + +var ( + errNoOperationKey = errors.New("operation key is not found in event message") + errAddRuleEvent = errors.New("failed to consume rule create event") + errUpdateRuleEvent = errors.New("failed to consume rule update event") + errUpdateRuleTagsEvent = errors.New("failed to consume rule update tags event") + errUpdateRuleScheduleEvent = errors.New("failed to consume rule update schedule event") + errEnableRuleEvent = errors.New("failed to consume rule enable event") + errDisableRuleEvent = errors.New("failed to consume rule disable event") + errRemoveRuleEvent = errors.New("failed to consume rule remove event") +) + +type eventHandler struct { + repo re.Repository + rolesEventHandler rconsumer.EventHandler +} + +func RulesEventsSubscribe(ctx context.Context, repo re.Repository, esURL, esConsumerName string, logger *slog.Logger) error { + subscriber, err := store.NewSubscriber(ctx, esURL, logger) + if err != nil { + return err + } + + subConfig := events.SubscriberConfig{ + Stream: stream, + Consumer: esConsumerName, + Handler: NewEventHandler(repo), + Ordered: true, + } + return subscriber.Subscribe(ctx, subConfig) +} + +// NewEventHandler returns new event store handler. +func NewEventHandler(repo re.Repository) events.EventHandler { + reh := rconsumer.NewEventHandler("rule", repo) + return &eventHandler{ + repo: repo, + rolesEventHandler: reh, + } +} + +func (es *eventHandler) Handle(ctx context.Context, event events.Event) error { + msg, err := event.Encode() + if err != nil { + return err + } + + op, ok := msg["operation"] + if !ok { + return errNoOperationKey + } + + switch op { + case create: + return es.addRuleHandler(ctx, msg) + case update: + return es.updateRuleHandler(ctx, msg) + case updateTags: + return es.updateRuleTagsHandler(ctx, msg) + case updateSchedule: + return es.updateRuleScheduleHandler(ctx, msg) + case enable: + return es.enableRuleHandler(ctx, msg) + case disable: + return es.disableRuleHandler(ctx, msg) + case remove: + return es.removeRuleHandler(ctx, msg) + } + + return es.rolesEventHandler.Handle(ctx, op, msg) +} + +func (es *eventHandler) addRuleHandler(ctx context.Context, data map[string]any) error { + r, rps, err := decodeAddRuleEvent(data) + if err != nil { + return errors.Wrap(errAddRuleEvent, err) + } + + if _, err := es.repo.AddRule(ctx, r); err != nil { + return errors.Wrap(errAddRuleEvent, err) + } + + if _, err := es.repo.AddRoles(ctx, rps); err != nil { + return errors.Wrap(errAddRuleEvent, err) + } + + return nil +} + +func (es *eventHandler) updateRuleHandler(ctx context.Context, data map[string]any) error { + r, err := decodeUpdateRuleEvent(data) + if err != nil { + return errors.Wrap(errUpdateRuleEvent, err) + } + + if _, err := es.repo.UpdateRule(ctx, r); err != nil { + return errors.Wrap(errUpdateRuleEvent, err) + } + + return nil +} + +func (es *eventHandler) updateRuleTagsHandler(ctx context.Context, data map[string]any) error { + r, err := decodeUpdateRuleTagsEvent(data) + if err != nil { + return errors.Wrap(errUpdateRuleTagsEvent, err) + } + + if _, err := es.repo.UpdateRuleTags(ctx, r); err != nil { + return errors.Wrap(errUpdateRuleTagsEvent, err) + } + + return nil +} + +func (es *eventHandler) updateRuleScheduleHandler(ctx context.Context, data map[string]any) error { + r, err := decodeUpdateRuleScheduleEvent(data) + if err != nil { + return errors.Wrap(errUpdateRuleScheduleEvent, err) + } + + if _, err := es.repo.UpdateRuleSchedule(ctx, r); err != nil { + return errors.Wrap(errUpdateRuleScheduleEvent, err) + } + + return nil +} + +func (es *eventHandler) enableRuleHandler(ctx context.Context, data map[string]any) error { + r, err := decodeEnableRuleEvent(data) + if err != nil { + return errors.Wrap(errEnableRuleEvent, err) + } + + if _, err := es.repo.UpdateRuleStatus(ctx, r); err != nil { + return errors.Wrap(errEnableRuleEvent, err) + } + + return nil +} + +func (es *eventHandler) disableRuleHandler(ctx context.Context, data map[string]any) error { + r, err := decodeDisableRuleEvent(data) + if err != nil { + return errors.Wrap(errDisableRuleEvent, err) + } + + if _, err := es.repo.UpdateRuleStatus(ctx, r); err != nil { + return errors.Wrap(errDisableRuleEvent, err) + } + + return nil +} + +func (es *eventHandler) removeRuleHandler(ctx context.Context, data map[string]any) error { + id, err := decodeRemoveRuleEvent(data) + if err != nil { + return errors.Wrap(errRemoveRuleEvent, err) + } + + if err := es.repo.RemoveRule(ctx, id); err != nil { + return errors.Wrap(errRemoveRuleEvent, err) + } + + return nil +} diff --git a/re/api/endpoints.go b/re/api/endpoints.go index 6401495fb..55dbd4a6a 100644 --- a/re/api/endpoints.go +++ b/re/api/endpoints.go @@ -25,7 +25,7 @@ func addRuleEndpoint(s re.Service) endpoint.Endpoint { if err := req.validate(); err != nil { return addRuleRes{}, err } - rule, err := s.AddRule(ctx, session, req.Rule) + rule, _, err := s.AddRule(ctx, session, req.Rule) if err != nil { return addRuleRes{}, err } diff --git a/re/api/endpoints_test.go b/re/api/endpoints_test.go index 30b289f94..a6c8566f8 100644 --- a/re/api/endpoints_test.go +++ b/re/api/endpoints_test.go @@ -26,6 +26,7 @@ import ( authnmocks "github.com/absmach/supermq/pkg/authn/mocks" "github.com/absmach/supermq/pkg/errors" svcerr "github.com/absmach/supermq/pkg/errors/service" + "github.com/absmach/supermq/pkg/roles" "github.com/go-chi/chi/v5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -236,7 +237,7 @@ func TestAddRuleEndpoint(t *testing.T) { } authCall := authn.On("Authenticate", mock.Anything, tc.token).Return(tc.authnRes, tc.authnErr) - svcCall := svc.On("AddRule", mock.Anything, tc.authnRes, tc.rule).Return(tc.svcRes, tc.svcErr) + svcCall := svc.On("AddRule", mock.Anything, tc.authnRes, tc.rule).Return(tc.svcRes, []roles.RoleProvision{}, tc.svcErr) res, err := req.make() assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err)) diff --git a/re/events/events.go b/re/events/events.go index ffc60228d..5cf0cdde2 100644 --- a/re/events/events.go +++ b/re/events/events.go @@ -9,6 +9,7 @@ import ( "github.com/absmach/magistrala/re" "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/events" + "github.com/absmach/supermq/pkg/roles" ) const ( @@ -59,7 +60,8 @@ func (bre baseRuleEvent) Encode() map[string]any { } type createRuleEvent struct { - rule re.Rule + rule re.Rule + rolesProvisioned []roles.RoleProvision baseRuleEvent } @@ -70,6 +72,7 @@ func (cre createRuleEvent) Encode() (map[string]any, error) { } maps.Copy(val, cre.baseRuleEvent.Encode()) val["operation"] = ruleCreate + val["roles_provisioned"] = cre.rolesProvisioned return val, nil } diff --git a/re/events/streams.go b/re/events/streams.go index 46d5b07e8..6fda1a5c6 100644 --- a/re/events/streams.go +++ b/re/events/streams.go @@ -11,6 +11,7 @@ import ( "github.com/absmach/supermq/pkg/events" "github.com/absmach/supermq/pkg/events/store" "github.com/absmach/supermq/pkg/messaging" + "github.com/absmach/supermq/pkg/roles" rmEvents "github.com/absmach/supermq/pkg/roles/rolemanager/events" "github.com/go-chi/chi/v5/middleware" ) @@ -44,7 +45,7 @@ func NewEventStoreMiddleware(ctx context.Context, svc re.Service, url string) (r return nil, err } - res := rmEvents.NewRoleManagerEventStore("alarms", supermqPrefix, svc, publisher) + res := rmEvents.NewRoleManagerEventStore("rules", rulePrefix, svc, publisher) return &eventStore{ svc: svc, @@ -53,19 +54,20 @@ func NewEventStoreMiddleware(ctx context.Context, svc re.Service, url string) (r }, nil } -func (es *eventStore) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { - rule, err := es.svc.AddRule(ctx, session, r) +func (es *eventStore) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) { + rule, rps, err := es.svc.AddRule(ctx, session, r) if err != nil { - return rule, err + return rule, rps, err } event := createRuleEvent{ - rule: rule, - baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)), + rule: rule, + rolesProvisioned: rps, + baseRuleEvent: newBaseRuleEvent(session, middleware.GetReqID(ctx)), } if err := es.Publish(ctx, CreateStream, event); err != nil { - return rule, err + return rule, rps, err } - return rule, nil + return rule, rps, nil } func (es *eventStore) ListRules(ctx context.Context, session authn.Session, pm re.PageMeta) (re.Page, error) { diff --git a/re/middleware/authorization.go b/re/middleware/authorization.go index 2fc51ee9d..b957934f1 100644 --- a/re/middleware/authorization.go +++ b/re/middleware/authorization.go @@ -14,6 +14,7 @@ import ( "github.com/absmach/supermq/pkg/messaging" "github.com/absmach/supermq/pkg/permissions" "github.com/absmach/supermq/pkg/policies" + "github.com/absmach/supermq/pkg/roles" rolemgr "github.com/absmach/supermq/pkg/roles/rolemanager/middleware" ) @@ -48,9 +49,9 @@ func AuthorizationMiddleware(svc re.Service, authz smqauthz.Authorization, entit }, nil } -func (am *authorizationMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { +func (am *authorizationMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) { if err := am.authorize(ctx, operations.OpAddRule, session, policies.DomainType, session.DomainID); err != nil { - return re.Rule{}, errors.Wrap(errDomainCreateRules, err) + return re.Rule{}, nil, errors.Wrap(errDomainCreateRules, err) } return am.svc.AddRule(ctx, session, r) diff --git a/re/middleware/callout.go b/re/middleware/callout.go index 26d0caf94..ae90b2828 100644 --- a/re/middleware/callout.go +++ b/re/middleware/callout.go @@ -15,6 +15,7 @@ import ( "github.com/absmach/supermq/pkg/messaging" "github.com/absmach/supermq/pkg/permissions" "github.com/absmach/supermq/pkg/policies" + "github.com/absmach/supermq/pkg/roles" rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware" ) @@ -47,14 +48,14 @@ func NewCallout(svc re.Service, callout callout.Callout, entitiesOps permissions }, nil } -func (cm *calloutMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { +func (cm *calloutMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) { params := map[string]any{ "entities": r, "count": 1, } if err := cm.callOut(ctx, session, operations.OpAddRule, params); err != nil { - return re.Rule{}, err + return re.Rule{}, nil, err } return cm.svc.AddRule(ctx, session, r) diff --git a/re/middleware/logging.go b/re/middleware/logging.go index 3b5abd55b..7319e9fa9 100644 --- a/re/middleware/logging.go +++ b/re/middleware/logging.go @@ -12,6 +12,7 @@ import ( "github.com/absmach/magistrala/re" "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/messaging" + "github.com/absmach/supermq/pkg/roles" rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware" ) @@ -31,7 +32,7 @@ func LoggingMiddleware(svc re.Service, logger *slog.Logger) re.Service { } } -func (lm *loggingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, err error) { +func (lm *loggingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, rps []roles.RoleProvision, err error) { defer func(begin time.Time) { args := []any{ slog.String("duration", time.Since(begin).String()), @@ -45,7 +46,8 @@ func (lm *loggingMiddleware) AddRule(ctx context.Context, session authn.Session, } lm.logger.Info("Add rule completed successfully", args...) }(time.Now()) - return lm.svc.AddRule(ctx, session, r) + res, rps, err = lm.svc.AddRule(ctx, session, r) + return } func (lm *loggingMiddleware) ViewRule(ctx context.Context, session authn.Session, id string, withRoles bool) (res re.Rule, err error) { diff --git a/re/middleware/metrics.go b/re/middleware/metrics.go index b929dfb86..1365f3eb2 100644 --- a/re/middleware/metrics.go +++ b/re/middleware/metrics.go @@ -10,6 +10,7 @@ import ( "github.com/absmach/magistrala/re" "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/messaging" + "github.com/absmach/supermq/pkg/roles" rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware" "github.com/go-kit/kit/metrics" ) @@ -32,7 +33,7 @@ func NewMetricsMiddleware(counter metrics.Counter, latency metrics.Histogram, se } } -func (mm *metricsMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { +func (mm *metricsMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) { defer func(begin time.Time) { mm.counter.With("method", "add_rule").Add(1) mm.latency.With("method", "add_rule").Observe(time.Since(begin).Seconds()) diff --git a/re/middleware/tracing.go b/re/middleware/tracing.go index d0ead6a60..c700a466d 100644 --- a/re/middleware/tracing.go +++ b/re/middleware/tracing.go @@ -9,6 +9,7 @@ import ( "github.com/absmach/magistrala/re" "github.com/absmach/supermq/pkg/authn" "github.com/absmach/supermq/pkg/messaging" + "github.com/absmach/supermq/pkg/roles" rolemw "github.com/absmach/supermq/pkg/roles/rolemanager/middleware" smqTracing "github.com/absmach/supermq/pkg/tracing" "go.opentelemetry.io/otel/attribute" @@ -31,7 +32,7 @@ func NewTracingMiddleware(tracer trace.Tracer, svc re.Service) re.Service { } } -func (tm *tracingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { +func (tm *tracingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) { ctx, span := smqTracing.StartSpan(ctx, tm.tracer, "add_rule", trace.WithAttributes( attribute.String("name", r.Name), attribute.String("domain_id", r.DomainID), diff --git a/re/mocks/service.go b/re/mocks/service.go index 809ed88a3..fbf574bcb 100644 --- a/re/mocks/service.go +++ b/re/mocks/service.go @@ -136,7 +136,7 @@ func (_c *Service_AddRole_Call) RunAndReturn(run func(ctx context.Context, sessi } // AddRule provides a mock function for the type Service -func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) { +func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error) { ret := _mock.Called(ctx, session, r) if len(ret) == 0 { @@ -144,8 +144,9 @@ func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.R } var r0 re.Rule - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok { + var r1 []roles.RoleProvision + var r2 error + if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, []roles.RoleProvision, error)); ok { return returnFunc(ctx, session, r) } if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok { @@ -153,12 +154,19 @@ func (_mock *Service) AddRule(ctx context.Context, session authn.Session, r re.R } else { r0 = ret.Get(0).(re.Rule) } - if returnFunc, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok { + if returnFunc, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) []roles.RoleProvision); ok { r1 = returnFunc(ctx, session, r) } else { - r1 = ret.Error(1) + if ret.Get(1) != nil { + r1 = ret.Get(1).([]roles.RoleProvision) + } } - return r0, r1 + if returnFunc, ok := ret.Get(2).(func(context.Context, authn.Session, re.Rule) error); ok { + r2 = returnFunc(ctx, session, r) + } else { + r2 = ret.Error(2) + } + return r0, r1, r2 } // Service_AddRule_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddRule' @@ -197,12 +205,12 @@ func (_c *Service_AddRule_Call) Run(run func(ctx context.Context, session authn. return _c } -func (_c *Service_AddRule_Call) Return(rule re.Rule, err error) *Service_AddRule_Call { - _c.Call.Return(rule, err) +func (_c *Service_AddRule_Call) Return(rule re.Rule, roleProvisions []roles.RoleProvision, err error) *Service_AddRule_Call { + _c.Call.Return(rule, roleProvisions, err) return _c } -func (_c *Service_AddRule_Call) RunAndReturn(run func(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error)) *Service_AddRule_Call { +func (_c *Service_AddRule_Call) RunAndReturn(run func(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, []roles.RoleProvision, error)) *Service_AddRule_Call { _c.Call.Return(run) return _c } diff --git a/re/rule.go b/re/rule.go index 10d28756a..349ee21f6 100644 --- a/re/rule.go +++ b/re/rule.go @@ -21,6 +21,8 @@ const ( GoType ) +const TimeLayout = "2006-01-02T15:04:05.999999Z" + type ( // ScriptType indicates Runtime type for the future versions // that will support JS or Go runtimes alongside Lua. @@ -66,7 +68,7 @@ func (r Rule) EventEncode() (map[string]any, error) { m := map[string]any{ "id": r.ID, "name": r.Name, - "created_at": r.CreatedAt.Format(time.RFC3339Nano), + "created_at": r.CreatedAt.Format(TimeLayout), "created_by": r.CreatedBy, "schedule": r.Schedule.EventEncode(), "status": r.Status.String(), @@ -81,7 +83,7 @@ func (r Rule) EventEncode() (map[string]any, error) { } if !r.UpdatedAt.IsZero() { - m["updated_at"] = r.UpdatedAt.Format(time.RFC3339Nano) + m["updated_at"] = r.UpdatedAt.Format(TimeLayout) } if r.UpdatedBy != "" { @@ -225,7 +227,7 @@ type Page struct { type Service interface { messaging.MessageHandler - AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) + AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, []roles.RoleProvision, error) ViewRule(ctx context.Context, session authn.Session, id string, withRoles bool) (Rule, error) UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) UpdateRuleTags(ctx context.Context, session authn.Session, r Rule) (Rule, error) diff --git a/re/service.go b/re/service.go index 153df304d..93ad12395 100644 --- a/re/service.go +++ b/re/service.go @@ -58,17 +58,17 @@ func NewService(repo Repository, runInfo chan pkglog.RunInfo, policy policies.Se }, nil } -func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRule Rule, retErr error) { +func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRule Rule, retRps []roles.RoleProvision, retErr error) { if r.Logic.Type == GoType && goKeywordRegex.MatchString(r.Logic.Value) { - return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed) + return Rule{}, nil, errors.Wrap(svcerr.ErrMalformedEntity, ErrGoroutinesNotAllowed) } if r.Logic.Type == GoType && panicRegex.MatchString(r.Logic.Value) { - return Rule{}, errors.Wrap(svcerr.ErrMalformedEntity, ErrPanicNotAllowed) + return Rule{}, nil, errors.Wrap(svcerr.ErrMalformedEntity, ErrPanicNotAllowed) } id, err := re.idp.ID() if err != nil { - return Rule{}, err + return Rule{}, nil, err } now := time.Now().UTC() r.CreatedAt = now @@ -84,7 +84,7 @@ func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRu rule, err := re.repo.AddRule(ctx, r) if err != nil { - return Rule{}, errors.Wrap(svcerr.ErrCreateEntity, err) + return Rule{}, nil, errors.Wrap(svcerr.ErrCreateEntity, err) } defer func() { @@ -109,12 +109,12 @@ func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (retRu }, } - _, err = re.AddNewEntitiesRoles(ctx, session.DomainID, session.UserID, []string{rule.ID}, optionalPolicies, newBuiltInRoleMembers) + rps, err := re.AddNewEntitiesRoles(ctx, session.DomainID, session.UserID, []string{rule.ID}, optionalPolicies, newBuiltInRoleMembers) if err != nil { - return Rule{}, errors.Wrap(svcerr.ErrAddPolicies, err) + return Rule{}, nil, errors.Wrap(svcerr.ErrAddPolicies, err) } - return rule, nil + return rule, rps, nil } func (re *re) ViewRule(ctx context.Context, session authn.Session, id string, withRoles bool) (Rule, error) { diff --git a/re/service_test.go b/re/service_test.go index cd08f59f1..09081435d 100644 --- a/re/service_test.go +++ b/re/service_test.go @@ -494,7 +494,7 @@ func TestAddRule(t *testing.T) { policyCall2 := policies.On("DeletePolicies", context.Background(), mock.Anything).Return(tc.deletePolicies).Maybe() repoCall1 := repo.On("AddRoles", context.Background(), mock.Anything).Return([]roles.RoleProvision{}, tc.addRoleErr) repoCall2 := repo.On("Remove", context.Background(), mock.Anything).Return(tc.deleteErr).Maybe() - res, err := svc.AddRule(context.Background(), tc.session, tc.rule) + res, _, err := svc.AddRule(context.Background(), tc.session, tc.rule) assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err)) if err == nil { assert.NotEmpty(t, res.ID, "expected non-empty result in ID") diff --git a/readers/api/http/transport.go b/readers/api/http/transport.go index 9b7964044..eb011285a 100644 --- a/readers/api/http/transport.go +++ b/readers/api/http/transport.go @@ -227,7 +227,7 @@ func authenticate(ctx context.Context, req listMessagesReq, authn smqauthn.Authe if err != nil { return "", "", err } - if session.Role == smqauthn.AdminRole { + if session.Role == smqauthn.SuperAdminRole { return session.UserID, policies.UserType, nil } diff --git a/scripts/supermq.sh b/scripts/supermq.sh index 9dcb01a0e..3ba648ef6 100755 --- a/scripts/supermq.sh +++ b/scripts/supermq.sh @@ -13,7 +13,7 @@ REPO_URL=https://github.com/absmach/supermq TEMP_DIR="supermq" DOCKER_DIR="docker" DOCKER_DST_DIR="../docker" -DEST_DIR="../docker/supermq-docker" +DEST_DIR="../../docker/supermq-docker" COMBINE_SCHEMA_SCRIPT="./combine-schema.sh" SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"