mirror of
https://github.com/absmach/magistrala.git
synced 2026-06-23 04:10:28 +00:00
61d0427898
Signed-off-by: dusan <borovcanindusan1@gmail.com>
464 lines
12 KiB
Go
464 lines
12 KiB
Go
// Copyright (c) Abstract Machines
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package events
|
|
|
|
import (
|
|
"context"
|
|
|
|
grpcTokenV1 "github.com/absmach/magistrala/api/grpc/token/v1"
|
|
"github.com/absmach/magistrala/pkg/authn"
|
|
"github.com/absmach/magistrala/pkg/events"
|
|
"github.com/absmach/magistrala/pkg/events/store"
|
|
"github.com/absmach/magistrala/users"
|
|
"github.com/go-chi/chi/v5/middleware"
|
|
)
|
|
|
|
const (
|
|
magistralaPrefix = "magistrala."
|
|
createStream = magistralaPrefix + userCreate
|
|
sendVerificationStream = magistralaPrefix + userSendVerification
|
|
verifyEmailStream = magistralaPrefix + userVerifyEmail
|
|
updateStream = magistralaPrefix + userUpdate
|
|
updateRoleStream = magistralaPrefix + userUpdateRole
|
|
updateTagsStream = magistralaPrefix + userUpdateTags
|
|
updateSecretStream = magistralaPrefix + userUpdateSecret
|
|
updateUsernameStream = magistralaPrefix + userUpdateUsername
|
|
updatePictureStream = magistralaPrefix + userUpdateProfilePicture
|
|
UpdateEmailStream = magistralaPrefix + userUpdateEmail
|
|
enableStream = magistralaPrefix + userEnable
|
|
disableStream = magistralaPrefix + userDisable
|
|
viewStream = magistralaPrefix + userView
|
|
viewProfileStream = magistralaPrefix + profileView
|
|
listStream = magistralaPrefix + userList
|
|
searchStream = magistralaPrefix + userSearch
|
|
identifyStream = magistralaPrefix + userIdentify
|
|
issueTokenStream = magistralaPrefix + issueToken
|
|
refreshTokenStream = magistralaPrefix + refreshToken
|
|
revokeRefreshTokenStream = magistralaPrefix + revokeRefreshToken
|
|
resetSecretStream = magistralaPrefix + resetSecret
|
|
sendPasswordResetStream = magistralaPrefix + sendPasswordReset
|
|
oauthStream = magistralaPrefix + oauthCallback
|
|
addPolicyStream = magistralaPrefix + addClientPolicy
|
|
deleteStream = magistralaPrefix + deleteUser
|
|
)
|
|
|
|
var _ users.Service = (*eventStore)(nil)
|
|
|
|
type eventStore struct {
|
|
events.Publisher
|
|
svc users.Service
|
|
}
|
|
|
|
// NewEventStoreMiddleware returns wrapper around users service that sends
|
|
// events to event store.
|
|
func NewEventStoreMiddleware(ctx context.Context, svc users.Service, url string) (users.Service, error) {
|
|
publisher, err := store.NewPublisher(ctx, url, "users-es-pub")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &eventStore{
|
|
svc: svc,
|
|
Publisher: publisher,
|
|
}, nil
|
|
}
|
|
|
|
func (es *eventStore) Register(ctx context.Context, session authn.Session, user users.User, selfRegister bool) (users.User, error) {
|
|
user, err := es.svc.Register(ctx, session, user, selfRegister)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
event := createUserEvent{
|
|
user,
|
|
session,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, createStream, event); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) SendVerification(ctx context.Context, session authn.Session) error {
|
|
err := es.svc.SendVerification(ctx, session)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
event := sendVerificationEvent{
|
|
session,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
return es.Publish(ctx, sendVerificationStream, event)
|
|
}
|
|
|
|
func (es *eventStore) VerifyEmail(ctx context.Context, verificationToken string) (users.User, error) {
|
|
user, err := es.svc.VerifyEmail(ctx, verificationToken)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
event := verifyEmailEvent{
|
|
email: user.Email,
|
|
userID: user.ID,
|
|
verifiedAt: user.VerifiedAt,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
if err := es.Publish(ctx, verifyEmailStream, event); err != nil {
|
|
return user, err
|
|
}
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) Update(ctx context.Context, session authn.Session, id string, usr users.UserReq) (users.User, error) {
|
|
user, err := es.svc.Update(ctx, session, id, usr)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return es.update(ctx, session, userUpdate, updateStream, user)
|
|
}
|
|
|
|
func (es *eventStore) UpdateRole(ctx context.Context, session authn.Session, user users.User) (users.User, error) {
|
|
user, err := es.svc.UpdateRole(ctx, session, user)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return es.update(ctx, session, userUpdateRole, updateRoleStream, user)
|
|
}
|
|
|
|
func (es *eventStore) UpdateTags(ctx context.Context, session authn.Session, id string, usr users.UserReq) (users.User, error) {
|
|
user, err := es.svc.UpdateTags(ctx, session, id, usr)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return es.update(ctx, session, userUpdateTags, updateTagsStream, user)
|
|
}
|
|
|
|
func (es *eventStore) UpdateSecret(ctx context.Context, session authn.Session, oldSecret, newSecret string) (users.User, error) {
|
|
user, err := es.svc.UpdateSecret(ctx, session, oldSecret, newSecret)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return es.update(ctx, session, userUpdateSecret, updateSecretStream, user)
|
|
}
|
|
|
|
func (es *eventStore) UpdateUsername(ctx context.Context, session authn.Session, id, username string) (users.User, error) {
|
|
user, err := es.svc.UpdateUsername(ctx, session, id, username)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
event := updateUsernameEvent{
|
|
user,
|
|
session,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, updateUsernameStream, event); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) UpdateProfilePicture(ctx context.Context, session authn.Session, id string, usr users.UserReq) (users.User, error) {
|
|
user, err := es.svc.UpdateProfilePicture(ctx, session, id, usr)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
event := updateProfilePictureEvent{
|
|
user,
|
|
session,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, updatePictureStream, event); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) UpdateEmail(ctx context.Context, session authn.Session, id, email string) (users.User, error) {
|
|
user, err := es.svc.UpdateEmail(ctx, session, id, email)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return es.update(ctx, session, userUpdateEmail, UpdateEmailStream, user)
|
|
}
|
|
|
|
func (es *eventStore) update(ctx context.Context, session authn.Session, operation, stream string, user users.User) (users.User, error) {
|
|
event := updateUserEvent{
|
|
user, operation, session, middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, stream, event); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) View(ctx context.Context, session authn.Session, id string) (users.User, error) {
|
|
user, err := es.svc.View(ctx, session, id)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
event := viewUserEvent{
|
|
user,
|
|
session,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, viewStream, event); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) ViewProfile(ctx context.Context, session authn.Session) (users.User, error) {
|
|
user, err := es.svc.ViewProfile(ctx, session)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
event := viewProfileEvent{
|
|
user,
|
|
session,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, viewProfileStream, event); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) ListUsers(ctx context.Context, session authn.Session, pm users.Page) (users.UsersPage, error) {
|
|
cp, err := es.svc.ListUsers(ctx, session, pm)
|
|
if err != nil {
|
|
return cp, err
|
|
}
|
|
event := listUserEvent{
|
|
pm,
|
|
session,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, listStream, event); err != nil {
|
|
return cp, err
|
|
}
|
|
|
|
return cp, nil
|
|
}
|
|
|
|
func (es *eventStore) SearchUsers(ctx context.Context, pm users.Page) (users.UsersPage, error) {
|
|
cp, err := es.svc.SearchUsers(ctx, pm)
|
|
if err != nil {
|
|
return cp, err
|
|
}
|
|
event := searchUserEvent{
|
|
pm,
|
|
middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, searchStream, event); err != nil {
|
|
return cp, err
|
|
}
|
|
|
|
return cp, nil
|
|
}
|
|
|
|
func (es *eventStore) Enable(ctx context.Context, session authn.Session, id string) (users.User, error) {
|
|
user, err := es.svc.Enable(ctx, session, id)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return es.changeStatus(ctx, session, userEnable, enableStream, user)
|
|
}
|
|
|
|
func (es *eventStore) Disable(ctx context.Context, session authn.Session, id string) (users.User, error) {
|
|
user, err := es.svc.Disable(ctx, session, id)
|
|
if err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return es.changeStatus(ctx, session, userDisable, disableStream, user)
|
|
}
|
|
|
|
func (es *eventStore) changeStatus(ctx context.Context, session authn.Session, operation, stream string, user users.User) (users.User, error) {
|
|
event := changeUserStatusEvent{
|
|
id: user.ID,
|
|
operation: operation,
|
|
updatedAt: user.UpdatedAt,
|
|
updatedBy: user.UpdatedBy,
|
|
status: user.Status.String(),
|
|
Session: session,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, stream, event); err != nil {
|
|
return user, err
|
|
}
|
|
|
|
return user, nil
|
|
}
|
|
|
|
func (es *eventStore) Identify(ctx context.Context, session authn.Session) (string, error) {
|
|
userID, err := es.svc.Identify(ctx, session)
|
|
if err != nil {
|
|
return userID, err
|
|
}
|
|
|
|
event := identifyUserEvent{
|
|
userID: userID,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, identifyStream, event); err != nil {
|
|
return userID, err
|
|
}
|
|
|
|
return userID, nil
|
|
}
|
|
|
|
func (es *eventStore) SendPasswordReset(ctx context.Context, email string) error {
|
|
err := es.svc.SendPasswordReset(ctx, email)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
event := sendPasswordResetEvent{
|
|
email: email,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
return es.Publish(ctx, sendPasswordResetStream, event)
|
|
}
|
|
|
|
func (es *eventStore) IssueToken(ctx context.Context, username, secret, description string) (*grpcTokenV1.Token, error) {
|
|
token, err := es.svc.IssueToken(ctx, username, secret, description)
|
|
if err != nil {
|
|
return token, err
|
|
}
|
|
|
|
event := issueTokenEvent{
|
|
username: username,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, issueTokenStream, event); err != nil {
|
|
return token, err
|
|
}
|
|
|
|
return token, nil
|
|
}
|
|
|
|
func (es *eventStore) RefreshToken(ctx context.Context, session authn.Session, refreshToken string) (*grpcTokenV1.Token, error) {
|
|
token, err := es.svc.RefreshToken(ctx, session, refreshToken)
|
|
if err != nil {
|
|
return token, err
|
|
}
|
|
|
|
event := refreshTokenEvent{
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, refreshTokenStream, event); err != nil {
|
|
return token, err
|
|
}
|
|
|
|
return token, nil
|
|
}
|
|
|
|
func (es *eventStore) RevokeRefreshToken(ctx context.Context, session authn.Session, tokenID string) error {
|
|
err := es.svc.RevokeRefreshToken(ctx, session, tokenID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
event := revokeRefreshTokenEvent{
|
|
tokenID: tokenID,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
return es.Publish(ctx, revokeRefreshTokenStream, event)
|
|
}
|
|
|
|
func (es *eventStore) ListActiveRefreshTokens(ctx context.Context, session authn.Session) (*grpcTokenV1.ListUserRefreshTokensRes, error) {
|
|
return es.svc.ListActiveRefreshTokens(ctx, session)
|
|
}
|
|
|
|
func (es *eventStore) ResetSecret(ctx context.Context, session authn.Session, secret string) error {
|
|
if err := es.svc.ResetSecret(ctx, session, secret); err != nil {
|
|
return err
|
|
}
|
|
|
|
event := resetSecretEvent{
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
return es.Publish(ctx, resetSecretStream, event)
|
|
}
|
|
|
|
func (es *eventStore) OAuthCallback(ctx context.Context, user users.User) (users.User, error) {
|
|
token, err := es.svc.OAuthCallback(ctx, user)
|
|
if err != nil {
|
|
return token, err
|
|
}
|
|
|
|
event := oauthCallbackEvent{
|
|
userID: user.ID,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
if err := es.Publish(ctx, oauthStream, event); err != nil {
|
|
return token, err
|
|
}
|
|
|
|
return token, nil
|
|
}
|
|
|
|
func (es *eventStore) Delete(ctx context.Context, session authn.Session, id string) error {
|
|
if err := es.svc.Delete(ctx, session, id); err != nil {
|
|
return err
|
|
}
|
|
|
|
event := deleteUserEvent{
|
|
id: id,
|
|
Session: session,
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
return es.Publish(ctx, deleteStream, event)
|
|
}
|
|
|
|
func (es *eventStore) OAuthAddUserPolicy(ctx context.Context, user users.User) error {
|
|
if err := es.svc.OAuthAddUserPolicy(ctx, user); err != nil {
|
|
return err
|
|
}
|
|
|
|
event := addUserPolicyEvent{
|
|
id: user.ID,
|
|
role: user.Role.String(),
|
|
requestID: middleware.GetReqID(ctx),
|
|
}
|
|
|
|
return es.Publish(ctx, addPolicyStream, event)
|
|
}
|