SMQ-2740 - Create events streams per service per action type (#2744)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-04-02 18:14:03 +03:00
committed by GitHub
parent 7a3a55d399
commit 3931ecabdb
30 changed files with 396 additions and 273 deletions
+21 -20
View File
@@ -17,10 +17,13 @@ const (
channelPrefix = "channel."
channelCreate = channelPrefix + "create"
channelUpdate = channelPrefix + "update"
channelChangeStatus = channelPrefix + "change_status"
channelUpdateTags = channelPrefix + "update_tags"
channelEnable = channelPrefix + "enable"
channelDisable = channelPrefix + "disable"
channelRemove = channelPrefix + "remove"
channelView = channelPrefix + "view"
channelList = channelPrefix + "list"
channelListByUser = channelPrefix + "list_by_user"
channelConnect = channelPrefix + "connect"
channelDisconnect = channelPrefix + "disconnect"
channelSetParent = channelPrefix + "set_parent"
@@ -30,7 +33,7 @@ const (
var (
_ events.Event = (*createChannelEvent)(nil)
_ events.Event = (*updateChannelEvent)(nil)
_ events.Event = (*changeStatusChannelEvent)(nil)
_ events.Event = (*changeChannelStatusEvent)(nil)
_ events.Event = (*viewChannelEvent)(nil)
_ events.Event = (*listChannelEvent)(nil)
_ events.Event = (*removeChannelEvent)(nil)
@@ -74,14 +77,14 @@ func (cce createChannelEvent) Encode() (map[string]interface{}, error) {
type updateChannelEvent struct {
channels.Channel
operation string
authn.Session
operation string
requestID string
}
func (uce updateChannelEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": channelUpdate,
"operation": uce.operation,
"updated_at": uce.UpdatedAt,
"updated_by": uce.UpdatedBy,
"domain": uce.DomainID,
@@ -90,9 +93,6 @@ func (uce updateChannelEvent) Encode() (map[string]interface{}, error) {
"super_admin": uce.SuperAdmin,
"request_id": uce.requestID,
}
if uce.operation != "" {
val["operation"] = channelUpdate + "_" + uce.operation
}
if uce.ID != "" {
val["id"] = uce.ID
@@ -116,8 +116,9 @@ func (uce updateChannelEvent) Encode() (map[string]interface{}, error) {
return val, nil
}
type changeStatusChannelEvent struct {
type changeChannelStatusEvent struct {
id string
operation string
status string
updatedAt time.Time
updatedBy string
@@ -125,18 +126,18 @@ type changeStatusChannelEvent struct {
requestID string
}
func (rce changeStatusChannelEvent) Encode() (map[string]interface{}, error) {
func (cse changeChannelStatusEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": channelChangeStatus,
"id": rce.id,
"status": rce.status,
"updated_at": rce.updatedAt,
"updated_by": rce.updatedBy,
"domain": rce.DomainID,
"user_id": rce.UserID,
"token_type": rce.Type.String(),
"super_admin": rce.SuperAdmin,
"request_id": rce.requestID,
"operation": cse.operation,
"id": cse.id,
"status": cse.status,
"updated_at": cse.updatedAt,
"updated_by": cse.updatedBy,
"domain": cse.DomainID,
"user_id": cse.UserID,
"token_type": cse.Type.String(),
"super_admin": cse.SuperAdmin,
"request_id": cse.requestID,
}, nil
}
@@ -235,7 +236,7 @@ type listUserChannelsEvent struct {
func (luce listUserChannelsEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": channelList,
"operation": channelListByUser,
"req_user_id": luce.userID,
"total": luce.Total,
"offset": luce.Offset,
+53 -32
View File
@@ -16,7 +16,22 @@ import (
"github.com/go-chi/chi/v5/middleware"
)
const streamID = "supermq.channels"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + channelCreate
updateStream = supermqPrefix + channelUpdate
updateTagsStream = supermqPrefix + channelUpdateTags
enableStream = supermqPrefix + channelEnable
disableStream = supermqPrefix + channelDisable
removeStream = supermqPrefix + channelRemove
viewStream = supermqPrefix + channelView
listStream = supermqPrefix + channelList
listByUserStream = supermqPrefix + channelListByUser
connectStream = supermqPrefix + channelConnect
disconnectStream = supermqPrefix + channelDisconnect
setParentStream = supermqPrefix + channelSetParent
removeParentStream = supermqPrefix + channelRemoveParent
)
var _ channels.Service = (*eventStore)(nil)
@@ -29,7 +44,7 @@ type eventStore struct {
// NewEventStoreMiddleware returns wrapper around clients service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc channels.Service, url string) (channels.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -55,7 +70,7 @@ func (es *eventStore) CreateChannels(ctx context.Context, session authn.Session,
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {
return chs, rps, err
}
}
@@ -64,32 +79,37 @@ func (es *eventStore) CreateChannels(ctx context.Context, session authn.Session,
}
func (es *eventStore) UpdateChannel(ctx context.Context, session authn.Session, ch channels.Channel) (channels.Channel, error) {
chann, err := es.svc.UpdateChannel(ctx, session, ch)
ch, err := es.svc.UpdateChannel(ctx, session, ch)
if err != nil {
return chann, err
return ch, err
}
return es.update(ctx, "", session, chann)
event := updateChannelEvent{
Channel: ch,
Session: session,
operation: channelUpdate,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, updateStream, event); err != nil {
return ch, err
}
return ch, nil
}
func (es *eventStore) UpdateChannelTags(ctx context.Context, session authn.Session, ch channels.Channel) (channels.Channel, error) {
chann, err := es.svc.UpdateChannelTags(ctx, session, ch)
ch, err := es.svc.UpdateChannelTags(ctx, session, ch)
if err != nil {
return chann, err
return ch, err
}
return es.update(ctx, "tags", session, chann)
}
func (es *eventStore) update(ctx context.Context, operation string, session authn.Session, ch channels.Channel) (channels.Channel, error) {
event := updateChannelEvent{
Channel: ch,
operation: operation,
Session: session,
operation: channelUpdateTags,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updateTagsStream, event); err != nil {
return ch, err
}
@@ -107,7 +127,7 @@ func (es *eventStore) ViewChannel(ctx context.Context, session authn.Session, id
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, viewStream, event); err != nil {
return chann, err
}
@@ -124,7 +144,7 @@ func (es *eventStore) ListChannels(ctx context.Context, session authn.Session, p
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {
return cp, err
}
@@ -142,7 +162,7 @@ func (es *eventStore) ListUserChannels(ctx context.Context, session authn.Sessio
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listByUserStream, event); err != nil {
return cp, err
}
@@ -150,33 +170,34 @@ func (es *eventStore) ListUserChannels(ctx context.Context, session authn.Sessio
}
func (es *eventStore) EnableChannel(ctx context.Context, session authn.Session, id string) (channels.Channel, error) {
cli, err := es.svc.EnableChannel(ctx, session, id)
ch, err := es.svc.EnableChannel(ctx, session, id)
if err != nil {
return cli, err
return ch, err
}
return es.changeStatus(ctx, session, cli)
return es.changeStatus(ctx, session, channelEnable, enableStream, ch)
}
func (es *eventStore) DisableChannel(ctx context.Context, session authn.Session, id string) (channels.Channel, error) {
cli, err := es.svc.DisableChannel(ctx, session, id)
ch, err := es.svc.DisableChannel(ctx, session, id)
if err != nil {
return cli, err
return ch, err
}
return es.changeStatus(ctx, session, cli)
return es.changeStatus(ctx, session, channelDisable, disableStream, ch)
}
func (es *eventStore) changeStatus(ctx context.Context, session authn.Session, ch channels.Channel) (channels.Channel, error) {
event := changeStatusChannelEvent{
func (es *eventStore) changeStatus(ctx context.Context, session authn.Session, operation, stream string, ch channels.Channel) (channels.Channel, error) {
event := changeChannelStatusEvent{
id: ch.ID,
operation: operation,
updatedAt: ch.UpdatedAt,
updatedBy: ch.UpdatedBy,
status: ch.Status.String(),
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, stream, event); err != nil {
return ch, err
}
@@ -194,7 +215,7 @@ func (es *eventStore) RemoveChannel(ctx context.Context, session authn.Session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeStream, event); err != nil {
return err
}
@@ -214,7 +235,7 @@ func (es *eventStore) Connect(ctx context.Context, session authn.Session, chIDs,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, connectStream, event); err != nil {
return err
}
@@ -234,7 +255,7 @@ func (es *eventStore) Disconnect(ctx context.Context, session authn.Session, chI
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, disconnectStream, event); err != nil {
return err
}
@@ -253,7 +274,7 @@ func (es *eventStore) SetParentGroup(ctx context.Context, session authn.Session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, setParentStream, event); err != nil {
return err
}
@@ -271,7 +292,7 @@ func (es *eventStore) RemoveParentGroup(ctx context.Context, session authn.Sessi
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeParentStream, event); err != nil {
return err
}
+20 -19
View File
@@ -16,12 +16,16 @@ const (
clientPrefix = "client."
clientCreate = clientPrefix + "create"
clientUpdate = clientPrefix + "update"
clientChangeStatus = clientPrefix + "change_status"
clientUpdateTags = clientPrefix + "update_tags"
clientUpdateSecret = clientPrefix + "update_secret"
clientEnable = clientPrefix + "enable"
clientDisable = clientPrefix + "disable"
clientRemove = clientPrefix + "remove"
clientView = clientPrefix + "view"
clientViewPerms = clientPrefix + "view_perms"
clientList = clientPrefix + "list"
clientListByGroup = clientPrefix + "list_by_channel"
clientListByUser = clientPrefix + "list_by_user"
clientIdentify = clientPrefix + "identify"
clientAuthorize = clientPrefix + "authorize"
clientSetParent = clientPrefix + "set_parent"
@@ -31,7 +35,7 @@ const (
var (
_ events.Event = (*createClientEvent)(nil)
_ events.Event = (*updateClientEvent)(nil)
_ events.Event = (*changeStatusClientEvent)(nil)
_ events.Event = (*changeClientStatusEvent)(nil)
_ events.Event = (*viewClientEvent)(nil)
_ events.Event = (*viewClientPermsEvent)(nil)
_ events.Event = (*listClientEvent)(nil)
@@ -88,7 +92,7 @@ type updateClientEvent struct {
func (uce updateClientEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": clientUpdate,
"operation": uce.operation,
"updated_at": uce.UpdatedAt,
"updated_by": uce.UpdatedBy,
"domain": uce.DomainID,
@@ -97,10 +101,6 @@ func (uce updateClientEvent) Encode() (map[string]interface{}, error) {
"super_admin": uce.SuperAdmin,
"request_id": uce.requestID,
}
if uce.operation != "" {
val["operation"] = clientUpdate + "_" + uce.operation
}
if uce.ID != "" {
val["id"] = uce.ID
}
@@ -126,8 +126,9 @@ func (uce updateClientEvent) Encode() (map[string]interface{}, error) {
return val, nil
}
type changeStatusClientEvent struct {
type changeClientStatusEvent struct {
id string
operation string
status string
updatedAt time.Time
updatedBy string
@@ -135,18 +136,18 @@ type changeStatusClientEvent struct {
requestID string
}
func (rce changeStatusClientEvent) Encode() (map[string]interface{}, error) {
func (cse changeClientStatusEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": clientChangeStatus,
"id": rce.id,
"status": rce.status,
"updated_at": rce.updatedAt,
"updated_by": rce.updatedBy,
"domain": rce.DomainID,
"user_id": rce.UserID,
"token_type": rce.Type.String(),
"super_admin": rce.SuperAdmin,
"request_id": rce.requestID,
"operation": cse.operation,
"id": cse.id,
"status": cse.status,
"updated_at": cse.updatedAt,
"updated_by": cse.updatedBy,
"domain": cse.DomainID,
"user_id": cse.UserID,
"token_type": cse.Type.String(),
"super_admin": cse.SuperAdmin,
"request_id": cse.requestID,
}, nil
}
+38 -19
View File
@@ -15,7 +15,25 @@ import (
"github.com/go-chi/chi/v5/middleware"
)
const streamID = "supermq.clients"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + clientCreate
updateStream = supermqPrefix + clientUpdate
updateTagsStream = supermqPrefix + clientUpdateTags
updateSecretStream = supermqPrefix + clientUpdateSecret
enableStream = supermqPrefix + clientEnable
disableStream = supermqPrefix + clientDisable
removeStream = supermqPrefix + clientRemove
viewStream = supermqPrefix + clientView
viewPermsStream = supermqPrefix + clientViewPerms
listStream = supermqPrefix + clientList
listByUserStream = supermqPrefix + clientListByUser
listByGroupStream = supermqPrefix + clientListByGroup
identifyStream = supermqPrefix + clientIdentify
authorizeStream = supermqPrefix + clientAuthorize
setParentStream = supermqPrefix + clientSetParent
removeParentStream = supermqPrefix + clientRemoveParent
)
var _ clients.Service = (*eventStore)(nil)
@@ -28,7 +46,7 @@ type eventStore struct {
// NewEventStoreMiddleware returns wrapper around clients service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc clients.Service, url string) (clients.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -54,7 +72,7 @@ func (es *eventStore) CreateClients(ctx context.Context, session authn.Session,
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {
return clis, rps, err
}
}
@@ -68,7 +86,7 @@ func (es *eventStore) Update(ctx context.Context, session authn.Session, client
return cli, err
}
return es.update(ctx, session, "", cli)
return es.update(ctx, session, clientUpdate, updateStream, cli)
}
func (es *eventStore) UpdateTags(ctx context.Context, session authn.Session, client clients.Client) (clients.Client, error) {
@@ -77,7 +95,7 @@ func (es *eventStore) UpdateTags(ctx context.Context, session authn.Session, cli
return cli, err
}
return es.update(ctx, session, "tags", cli)
return es.update(ctx, session, clientUpdateTags, updateTagsStream, cli)
}
func (es *eventStore) UpdateSecret(ctx context.Context, session authn.Session, id, key string) (clients.Client, error) {
@@ -86,10 +104,10 @@ func (es *eventStore) UpdateSecret(ctx context.Context, session authn.Session, i
return cli, err
}
return es.update(ctx, session, "secret", cli)
return es.update(ctx, session, clientUpdateSecret, updateSecretStream, cli)
}
func (es *eventStore) update(ctx context.Context, session authn.Session, operation string, client clients.Client) (clients.Client, error) {
func (es *eventStore) update(ctx context.Context, session authn.Session, operation, stream string, client clients.Client) (clients.Client, error) {
event := updateClientEvent{
Client: client,
operation: operation,
@@ -97,7 +115,7 @@ func (es *eventStore) update(ctx context.Context, session authn.Session, operati
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, stream, event); err != nil {
return client, err
}
@@ -115,7 +133,7 @@ func (es *eventStore) View(ctx context.Context, session authn.Session, id string
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, viewStream, event); err != nil {
return cli, err
}
@@ -132,7 +150,7 @@ func (es *eventStore) ListClients(ctx context.Context, session authn.Session, pm
session,
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {
return cp, err
}
@@ -150,7 +168,7 @@ func (es *eventStore) ListUserClients(ctx context.Context, session authn.Session
session,
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listByUserStream, event); err != nil {
return cp, err
}
@@ -163,7 +181,7 @@ func (es *eventStore) Enable(ctx context.Context, session authn.Session, id stri
return cli, err
}
return es.changeStatus(ctx, session, cli)
return es.changeStatus(ctx, session, clientEnable, enableStream, cli)
}
func (es *eventStore) Disable(ctx context.Context, session authn.Session, id string) (clients.Client, error) {
@@ -172,19 +190,20 @@ func (es *eventStore) Disable(ctx context.Context, session authn.Session, id str
return cli, err
}
return es.changeStatus(ctx, session, cli)
return es.changeStatus(ctx, session, clientDisable, disableStream, cli)
}
func (es *eventStore) changeStatus(ctx context.Context, session authn.Session, cli clients.Client) (clients.Client, error) {
event := changeStatusClientEvent{
func (es *eventStore) changeStatus(ctx context.Context, session authn.Session, operation, stream string, cli clients.Client) (clients.Client, error) {
event := changeClientStatusEvent{
id: cli.ID,
operation: operation,
updatedAt: cli.UpdatedAt,
updatedBy: cli.UpdatedBy,
status: cli.Status.String(),
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, stream, event); err != nil {
return cli, err
}
@@ -202,7 +221,7 @@ func (es *eventStore) Delete(ctx context.Context, session authn.Session, id stri
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeStream, event); err != nil {
return err
}
@@ -221,7 +240,7 @@ func (es *eventStore) SetParentGroup(ctx context.Context, session authn.Session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, setParentStream, event); err != nil {
return err
}
@@ -239,7 +258,7 @@ func (es *eventStore) RemoveParentGroup(ctx context.Context, session authn.Sessi
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeParentStream, event); err != nil {
return err
}
+15 -17
View File
@@ -13,23 +13,21 @@ import (
)
const (
domainPrefix = "domain."
domainCreate = domainPrefix + "create"
domainRetrieve = domainPrefix + "retrieve"
domainRetrieveStatus = domainPrefix + "retrieve_status"
domainUpdate = domainPrefix + "update"
domainEnable = domainPrefix + "enable"
domainDisable = domainPrefix + "disable"
domainFreeze = domainPrefix + "freeze"
domainList = domainPrefix + "list"
domainUserDelete = domainPrefix + "user_delete"
invitationPrefix = "invitation."
invitationSend = invitationPrefix + "send"
invitationAccept = invitationPrefix + "accept"
invitationReject = invitationPrefix + "reject"
invitationList = invitationPrefix + "list"
invitationRetrieve = invitationPrefix + "retrieve"
invitationDelete = invitationPrefix + "delete"
domainPrefix = "domain."
domainCreate = domainPrefix + "create"
domainRetrieve = domainPrefix + "retrieve"
domainUpdate = domainPrefix + "update"
domainEnable = domainPrefix + "enable"
domainDisable = domainPrefix + "disable"
domainFreeze = domainPrefix + "freeze"
domainList = domainPrefix + "list"
invitationPrefix = "invitation."
invitationSend = invitationPrefix + "send"
invitationAccept = invitationPrefix + "accept"
invitationReject = invitationPrefix + "reject"
invitationList = invitationPrefix + "list"
invitationRetrieve = invitationPrefix + "retrieve"
invitationDelete = invitationPrefix + "delete"
)
var (
+30 -15
View File
@@ -15,7 +15,22 @@ import (
"github.com/go-chi/chi/v5/middleware"
)
const streamID = "supermq.domains"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + domainCreate
retrieveStream = supermqPrefix + domainRetrieve
updateStream = supermqPrefix + domainUpdate
enableStream = supermqPrefix + domainEnable
disableStream = supermqPrefix + domainDisable
freezeStream = supermqPrefix + domainFreeze
listStream = supermqPrefix + domainList
sendInvitationStream = supermqPrefix + invitationSend
acceptInvitationStream = supermqPrefix + invitationAccept
rejectInvitationStream = supermqPrefix + invitationReject
listInvitationsStream = supermqPrefix + invitationList
retrieveInvitationStream = supermqPrefix + invitationRetrieve
deleteInvitationStream = supermqPrefix + invitationDelete
)
var _ domains.Service = (*eventStore)(nil)
@@ -28,7 +43,7 @@ type eventStore struct {
// NewEventStoreMiddleware returns wrapper around auth service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc domains.Service, url string) (domains.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -55,7 +70,7 @@ func (es *eventStore) CreateDomain(ctx context.Context, session authn.Session, d
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {
return domain, rps, err
}
@@ -74,7 +89,7 @@ func (es *eventStore) RetrieveDomain(ctx context.Context, session authn.Session,
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, retrieveStream, event); err != nil {
return domain, err
}
@@ -93,7 +108,7 @@ func (es *eventStore) UpdateDomain(ctx context.Context, session authn.Session, i
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updateStream, event); err != nil {
return domain, err
}
@@ -114,7 +129,7 @@ func (es *eventStore) EnableDomain(ctx context.Context, session authn.Session, i
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, enableStream, event); err != nil {
return domain, err
}
@@ -135,7 +150,7 @@ func (es *eventStore) DisableDomain(ctx context.Context, session authn.Session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, disableStream, event); err != nil {
return domain, err
}
@@ -156,7 +171,7 @@ func (es *eventStore) FreezeDomain(ctx context.Context, session authn.Session, i
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, freezeStream, event); err != nil {
return domain, err
}
@@ -178,7 +193,7 @@ func (es *eventStore) ListDomains(ctx context.Context, session authn.Session, p
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {
return dp, err
}
@@ -195,7 +210,7 @@ func (es *eventStore) SendInvitation(ctx context.Context, session authn.Session,
session: session,
}
return es.Publish(ctx, event)
return es.Publish(ctx, sendInvitationStream, event)
}
func (es *eventStore) ViewInvitation(ctx context.Context, session authn.Session, userID, domainID string) (domains.Invitation, error) {
@@ -212,7 +227,7 @@ func (es *eventStore) ViewInvitation(ctx context.Context, session authn.Session,
session: session,
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, retrieveInvitationStream, event); err != nil {
return invitation, err
}
@@ -230,7 +245,7 @@ func (es *eventStore) ListInvitations(ctx context.Context, session authn.Session
session: session,
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listInvitationsStream, event); err != nil {
return ip, err
}
@@ -247,7 +262,7 @@ func (es *eventStore) AcceptInvitation(ctx context.Context, session authn.Sessio
session: session,
}
return es.Publish(ctx, event)
return es.Publish(ctx, acceptInvitationStream, event)
}
func (es *eventStore) RejectInvitation(ctx context.Context, session authn.Session, domainID string) error {
@@ -260,7 +275,7 @@ func (es *eventStore) RejectInvitation(ctx context.Context, session authn.Sessio
session: session,
}
return es.Publish(ctx, event)
return es.Publish(ctx, rejectInvitationStream, event)
}
func (es *eventStore) DeleteInvitation(ctx context.Context, session authn.Session, inviteeUserID, domainID string) error {
@@ -274,5 +289,5 @@ func (es *eventStore) DeleteInvitation(ctx context.Context, session authn.Sessio
session: session,
}
return es.Publish(ctx, event)
return es.Publish(ctx, deleteInvitationStream, event)
}
+8 -6
View File
@@ -12,11 +12,12 @@ import (
"github.com/absmach/supermq/pkg/roles"
)
var (
const (
groupPrefix = "group."
groupCreate = groupPrefix + "create"
groupUpdate = groupPrefix + "update"
groupChangeStatus = groupPrefix + "change_status"
groupEnable = groupPrefix + "enable"
groupDisable = groupPrefix + "disable"
groupView = groupPrefix + "view"
groupList = groupPrefix + "list"
groupListUserGroups = groupPrefix + "list_user_groups"
@@ -34,7 +35,7 @@ var (
var (
_ events.Event = (*createGroupEvent)(nil)
_ events.Event = (*updateGroupEvent)(nil)
_ events.Event = (*changeStatusGroupEvent)(nil)
_ events.Event = (*changeGroupStatusEvent)(nil)
_ events.Event = (*viewGroupEvent)(nil)
_ events.Event = (*deleteGroupEvent)(nil)
_ events.Event = (*viewGroupEvent)(nil)
@@ -132,8 +133,9 @@ func (uge updateGroupEvent) Encode() (map[string]interface{}, error) {
return val, nil
}
type changeStatusGroupEvent struct {
type changeGroupStatusEvent struct {
id string
operation string
status string
updatedAt time.Time
updatedBy string
@@ -141,9 +143,9 @@ type changeStatusGroupEvent struct {
requestID string
}
func (rge changeStatusGroupEvent) Encode() (map[string]interface{}, error) {
func (rge changeGroupStatusEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": groupChangeStatus,
"operation": rge.operation,
"id": rge.id,
"status": rge.status,
"updated_at": rge.updatedAt,
+39 -20
View File
@@ -15,7 +15,25 @@ import (
"github.com/go-chi/chi/v5/middleware"
)
const streamID = "supermq.groups"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + groupCreate
updateStream = supermqPrefix + groupUpdate
enableStream = supermqPrefix + groupEnable
disableStream = supermqPrefix + groupDisable
viewStream = supermqPrefix + groupView
listStream = supermqPrefix + groupList
listUserGroupsStream = supermqPrefix + groupListUserGroups
removeStream = supermqPrefix + groupRemove
retrieveHierarchyStream = supermqPrefix + groupRetrieveGroupHierarchy
addParentStream = supermqPrefix + groupAddParentGroup
removeParentStream = supermqPrefix + groupRemoveParentGroup
viewParentStream = supermqPrefix + groupViewParentGroup
addChildrenStream = supermqPrefix + groupAddChildrenGroups
removeChildrenStream = supermqPrefix + groupRemoveChildrenGroups
removeAllChildrenStream = supermqPrefix + groupRemoveAllChildrenGroups
listChildrenStream = supermqPrefix + groupListChildrenGroups
)
var _ groups.Service = (*eventStore)(nil)
@@ -28,7 +46,7 @@ type eventStore struct {
// NewEventStoreMiddleware returns wrapper around clients service that sends
// events to event store.
func New(ctx context.Context, svc groups.Service, url string) (groups.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -54,7 +72,7 @@ func (es eventStore) CreateGroup(ctx context.Context, session authn.Session, gro
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {
return group, rps, err
}
@@ -73,7 +91,7 @@ func (es eventStore) UpdateGroup(ctx context.Context, session authn.Session, gro
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updateStream, event); err != nil {
return group, err
}
@@ -91,7 +109,7 @@ func (es eventStore) ViewGroup(ctx context.Context, session authn.Session, id st
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, viewStream, event); err != nil {
return group, err
}
@@ -112,7 +130,7 @@ func (es eventStore) ListGroups(ctx context.Context, session authn.Session, pm g
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {
return gp, err
}
@@ -133,7 +151,7 @@ func (es eventStore) ListUserGroups(ctx context.Context, session authn.Session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listUserGroupsStream, event); err != nil {
return gp, err
}
@@ -146,7 +164,7 @@ func (es eventStore) EnableGroup(ctx context.Context, session authn.Session, id
return group, err
}
return es.changeStatus(ctx, session, group)
return es.changeStatus(ctx, session, groupEnable, enableStream, group)
}
func (es eventStore) DisableGroup(ctx context.Context, session authn.Session, id string) (groups.Group, error) {
@@ -155,12 +173,13 @@ func (es eventStore) DisableGroup(ctx context.Context, session authn.Session, id
return group, err
}
return es.changeStatus(ctx, session, group)
return es.changeStatus(ctx, session, groupDisable, disableStream, group)
}
func (es eventStore) changeStatus(ctx context.Context, session authn.Session, group groups.Group) (groups.Group, error) {
event := changeStatusGroupEvent{
func (es eventStore) changeStatus(ctx context.Context, session authn.Session, operation, stream string, group groups.Group) (groups.Group, error) {
event := changeGroupStatusEvent{
id: group.ID,
operation: operation,
updatedAt: group.UpdatedAt,
updatedBy: group.UpdatedBy,
status: group.Status.String(),
@@ -168,7 +187,7 @@ func (es eventStore) changeStatus(ctx context.Context, session authn.Session, gr
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, stream, event); err != nil {
return group, err
}
@@ -179,7 +198,7 @@ func (es eventStore) DeleteGroup(ctx context.Context, session authn.Session, id
if err := es.svc.DeleteGroup(ctx, session, id); err != nil {
return err
}
if err := es.Publish(ctx, deleteGroupEvent{
if err := es.Publish(ctx, removeStream, deleteGroupEvent{
id: id,
Session: session,
requestID: middleware.GetReqID(ctx),
@@ -194,7 +213,7 @@ func (es eventStore) RetrieveGroupHierarchy(ctx context.Context, session authn.S
if err != nil {
return g, err
}
if err := es.Publish(ctx, retrieveGroupHierarchyEvent{id: id, Session: session, HierarchyPageMeta: hm, requestID: middleware.GetReqID(ctx)}); err != nil {
if err := es.Publish(ctx, retrieveHierarchyStream, retrieveGroupHierarchyEvent{id: id, Session: session, HierarchyPageMeta: hm, requestID: middleware.GetReqID(ctx)}); err != nil {
return g, err
}
return g, nil
@@ -204,7 +223,7 @@ func (es eventStore) AddParentGroup(ctx context.Context, session authn.Session,
if err := es.svc.AddParentGroup(ctx, session, id, parentID); err != nil {
return err
}
if err := es.Publish(ctx, addParentGroupEvent{id: id, parentID: parentID, Session: session, requestID: middleware.GetReqID(ctx)}); err != nil {
if err := es.Publish(ctx, addParentStream, addParentGroupEvent{id: id, parentID: parentID, Session: session, requestID: middleware.GetReqID(ctx)}); err != nil {
return err
}
return nil
@@ -214,7 +233,7 @@ func (es eventStore) RemoveParentGroup(ctx context.Context, session authn.Sessio
if err := es.svc.RemoveParentGroup(ctx, session, id); err != nil {
return err
}
if err := es.Publish(ctx, removeParentGroupEvent{id: id, Session: session, requestID: middleware.GetReqID(ctx)}); err != nil {
if err := es.Publish(ctx, removeParentStream, removeParentGroupEvent{id: id, Session: session, requestID: middleware.GetReqID(ctx)}); err != nil {
return err
}
return nil
@@ -224,7 +243,7 @@ func (es eventStore) AddChildrenGroups(ctx context.Context, session authn.Sessio
if err := es.svc.AddChildrenGroups(ctx, session, id, childrenGroupIDs); err != nil {
return err
}
if err := es.Publish(ctx, addChildrenGroupsEvent{id: id, Session: session, childrenIDs: childrenGroupIDs, requestID: middleware.GetReqID(ctx)}); err != nil {
if err := es.Publish(ctx, addChildrenStream, addChildrenGroupsEvent{id: id, Session: session, childrenIDs: childrenGroupIDs, requestID: middleware.GetReqID(ctx)}); err != nil {
return err
}
return nil
@@ -234,7 +253,7 @@ func (es eventStore) RemoveChildrenGroups(ctx context.Context, session authn.Ses
if err := es.svc.RemoveChildrenGroups(ctx, session, id, childrenGroupIDs); err != nil {
return err
}
if err := es.Publish(ctx, removeChildrenGroupsEvent{id: id, Session: session, childrenIDs: childrenGroupIDs, requestID: middleware.GetReqID(ctx)}); err != nil {
if err := es.Publish(ctx, removeChildrenStream, removeChildrenGroupsEvent{id: id, Session: session, childrenIDs: childrenGroupIDs, requestID: middleware.GetReqID(ctx)}); err != nil {
return err
}
@@ -245,7 +264,7 @@ func (es eventStore) RemoveAllChildrenGroups(ctx context.Context, session authn.
if err := es.svc.RemoveAllChildrenGroups(ctx, session, id); err != nil {
return err
}
if err := es.Publish(ctx, removeAllChildrenGroupsEvent{id: id, Session: session, requestID: middleware.GetReqID(ctx)}); err != nil {
if err := es.Publish(ctx, removeAllChildrenStream, removeAllChildrenGroupsEvent{id: id, Session: session, requestID: middleware.GetReqID(ctx)}); err != nil {
return err
}
return nil
@@ -256,7 +275,7 @@ func (es eventStore) ListChildrenGroups(ctx context.Context, session authn.Sessi
if err != nil {
return g, err
}
if err := es.Publish(ctx, listChildrenGroupsEvent{
if err := es.Publish(ctx, listChildrenStream, listChildrenGroupsEvent{
id: id,
domainID: session.DomainID,
startLevel: startLevel,
+10 -5
View File
@@ -15,7 +15,12 @@ import (
"github.com/absmach/supermq/pkg/events/store"
)
const streamID = "supermq.mqtt"
const (
supermqPrefix = "supermq."
subscribeStream = supermqPrefix + clientSubscribe
connectStream = supermqPrefix + clientConnect
disconnectStream = supermqPrefix + clientDisconnect
)
var (
errFailedSession = errors.New("failed to obtain session from context")
@@ -33,7 +38,7 @@ type eventStore struct {
// NewEventStoreMiddleware returns middleware around mGate service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, handler session.Handler, url, instance string) (session.Handler, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -61,7 +66,7 @@ func (es *eventStore) AuthConnect(ctx context.Context) error {
instance: es.instance,
}
return es.ep.Publish(ctx, ev)
return es.ep.Publish(ctx, connectStream, ev)
}
func (es *eventStore) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error {
@@ -103,7 +108,7 @@ func (es *eventStore) Subscribe(ctx context.Context, topics *[]string) error {
subtopic: subtopic,
}
if err := es.ep.Publish(ctx, ev); err != nil {
if err := es.ep.Publish(ctx, subscribeStream, ev); err != nil {
return err
}
}
@@ -132,7 +137,7 @@ func (es *eventStore) Disconnect(ctx context.Context) error {
instance: es.instance,
}
return es.ep.Publish(ctx, ev)
return es.ep.Publish(ctx, disconnectStream, ev)
}
func parseTopic(topic string) (string, string, error) {
+1 -1
View File
@@ -17,7 +17,7 @@ import (
)
const (
stream = "events.supermq.domains"
stream = "events.supermq.domain.*"
create = "domain.create"
update = "domain.update"
+1 -1
View File
@@ -26,7 +26,7 @@ type Event interface {
// Publisher specifies events publishing API.
type Publisher interface {
// Publish publishes event to stream.
Publish(ctx context.Context, event Event) error
Publish(ctx context.Context, stream string, event Event) error
// Close gracefully closes event publisher's connection.
Close() error
+10 -9
View File
@@ -86,16 +86,16 @@ func (_c *Publisher_Close_Call) RunAndReturn(run func() error) *Publisher_Close_
}
// Publish provides a mock function for the type Publisher
func (_mock *Publisher) Publish(ctx context.Context, event events.Event) error {
ret := _mock.Called(ctx, event)
func (_mock *Publisher) Publish(ctx context.Context, stream string, event events.Event) error {
ret := _mock.Called(ctx, stream, event)
if len(ret) == 0 {
panic("no return value specified for Publish")
}
var r0 error
if returnFunc, ok := ret.Get(0).(func(context.Context, events.Event) error); ok {
r0 = returnFunc(ctx, event)
if returnFunc, ok := ret.Get(0).(func(context.Context, string, events.Event) error); ok {
r0 = returnFunc(ctx, stream, event)
} else {
r0 = ret.Error(0)
}
@@ -109,14 +109,15 @@ type Publisher_Publish_Call struct {
// Publish is a helper method to define mock.On call
// - ctx
// - stream
// - event
func (_e *Publisher_Expecter) Publish(ctx interface{}, event interface{}) *Publisher_Publish_Call {
return &Publisher_Publish_Call{Call: _e.mock.On("Publish", ctx, event)}
func (_e *Publisher_Expecter) Publish(ctx interface{}, stream interface{}, event interface{}) *Publisher_Publish_Call {
return &Publisher_Publish_Call{Call: _e.mock.On("Publish", ctx, stream, event)}
}
func (_c *Publisher_Publish_Call) Run(run func(ctx context.Context, event events.Event)) *Publisher_Publish_Call {
func (_c *Publisher_Publish_Call) Run(run func(ctx context.Context, stream string, event events.Event)) *Publisher_Publish_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(events.Event))
run(args[0].(context.Context), args[1].(string), args[2].(events.Event))
})
return _c
}
@@ -126,7 +127,7 @@ func (_c *Publisher_Publish_Call) Return(err error) *Publisher_Publish_Call {
return _c
}
func (_c *Publisher_Publish_Call) RunAndReturn(run func(ctx context.Context, event events.Event) error) *Publisher_Publish_Call {
func (_c *Publisher_Publish_Call) RunAndReturn(run func(ctx context.Context, stream string, event events.Event) error) *Publisher_Publish_Call {
_c.Call.Return(run)
return _c
}
+3 -5
View File
@@ -22,10 +22,9 @@ type pubEventStore struct {
url string
conn *nats.Conn
publisher messaging.Publisher
stream string
}
func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
func NewPublisher(ctx context.Context, url string) (events.Publisher, error) {
conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects), nats.ReconnectBufSize(reconnectBufSize))
if err != nil {
return nil, err
@@ -47,13 +46,12 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
url: url,
conn: conn,
publisher: publisher,
stream: stream,
}
return es, nil
}
func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error {
func (es *pubEventStore) Publish(ctx context.Context, stream string, event events.Event) error {
values, err := event.Encode()
if err != nil {
return err
@@ -69,7 +67,7 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
Payload: data,
}
return es.publisher.Publish(ctx, es.stream, record)
return es.publisher.Publish(ctx, stream, record)
}
func (es *pubEventStore) Close() error {
+5 -5
View File
@@ -50,10 +50,10 @@ func (te testEvent) Encode() (map[string]interface{}, error) {
}
func TestPublish(t *testing.T) {
_, err := nats.NewPublisher(context.Background(), "http://invaliurl.com", stream)
_, err := nats.NewPublisher(context.Background(), "http://invaliurl.com")
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)
publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
publisher, err := nats.NewPublisher(context.Background(), natsURL)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer publisher.Close()
@@ -133,7 +133,7 @@ func TestPublish(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
event := testEvent{Data: tc.event}
err := publisher.Publish(context.Background(), event)
err := publisher.Publish(context.Background(), stream, event)
switch tc.err {
case nil:
receivedEvent := <-eventsChan
@@ -244,7 +244,7 @@ func TestPubsub(t *testing.T) {
}
func TestUnavailablePublish(t *testing.T) {
publisher, err := nats.NewPublisher(context.Background(), natsURL, stream)
publisher, err := nats.NewPublisher(context.Background(), natsURL)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
@@ -300,7 +300,7 @@ func generateRandomEvent() testEvent {
func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < numEvents; i++ {
go func() {
err := publisher.Publish(context.Background(), generateRandomEvent())
err := publisher.Publish(context.Background(), stream, generateRandomEvent())
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}()
}
+1 -1
View File
@@ -45,7 +45,7 @@ func TestMain(m *testing.M) {
natsURL = fmt.Sprintf("nats://%s:%s", "localhost", container.GetPort("4222/tcp"))
if err := pool.Retry(func() error {
_, err = nats.NewPublisher(context.Background(), natsURL, stream)
_, err = nats.NewPublisher(context.Background(), natsURL)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
+3 -5
View File
@@ -17,10 +17,9 @@ import (
type pubEventStore struct {
conn *amqp.Connection
publisher messaging.Publisher
stream string
}
func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
func NewPublisher(ctx context.Context, url string) (events.Publisher, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
@@ -41,13 +40,12 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
es := &pubEventStore{
conn: conn,
publisher: publisher,
stream: stream,
}
return es, nil
}
func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error {
func (es *pubEventStore) Publish(ctx context.Context, stream string, event events.Event) error {
values, err := event.Encode()
if err != nil {
return err
@@ -63,7 +61,7 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
Payload: data,
}
return es.publisher.Publish(ctx, es.stream, record)
return es.publisher.Publish(ctx, stream, record)
}
func (es *pubEventStore) Close() error {
+5 -5
View File
@@ -50,10 +50,10 @@ func (te testEvent) Encode() (map[string]interface{}, error) {
}
func TestPublish(t *testing.T) {
_, err := rabbitmq.NewPublisher(context.Background(), "http://invaliurl.com", stream)
_, err := rabbitmq.NewPublisher(context.Background(), "http://invaliurl.com")
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)
publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream)
publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer publisher.Close()
@@ -133,7 +133,7 @@ func TestPublish(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
event := testEvent{Data: tc.event}
err := publisher.Publish(context.Background(), event)
err := publisher.Publish(context.Background(), stream, event)
switch tc.err {
case nil:
receivedEvent := <-eventsChan
@@ -245,7 +245,7 @@ func TestPubsub(t *testing.T) {
}
func TestUnavailablePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream)
publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger)
@@ -301,7 +301,7 @@ func generateRandomEvent() testEvent {
func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < numEvents; i++ {
go func() {
err := publisher.Publish(context.Background(), generateRandomEvent())
err := publisher.Publish(context.Background(), stream, generateRandomEvent())
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}()
}
+1 -1
View File
@@ -44,7 +44,7 @@ func TestMain(m *testing.M) {
rabbitmqURL = fmt.Sprintf("amqp://%s:%s", "localhost", container.GetPort("5672/tcp"))
if err := pool.Retry(func() error {
_, err = rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream)
_, err = rabbitmq.NewPublisher(context.Background(), rabbitmqURL)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
+3 -5
View File
@@ -16,12 +16,11 @@ import (
type pubEventStore struct {
client *redis.Client
unpublishedEvents chan *redis.XAddArgs
stream string
mu sync.Mutex
flushPeriod time.Duration
}
func NewPublisher(ctx context.Context, url, stream string, flushPeriod time.Duration) (events.Publisher, error) {
func NewPublisher(ctx context.Context, url string, flushPeriod time.Duration) (events.Publisher, error) {
opts, err := redis.ParseURL(url)
if err != nil {
return nil, err
@@ -30,7 +29,6 @@ func NewPublisher(ctx context.Context, url, stream string, flushPeriod time.Dura
es := &pubEventStore{
client: redis.NewClient(opts),
unpublishedEvents: make(chan *redis.XAddArgs, events.MaxUnpublishedEvents),
stream: eventsPrefix + stream,
flushPeriod: flushPeriod,
}
@@ -39,7 +37,7 @@ func NewPublisher(ctx context.Context, url, stream string, flushPeriod time.Dura
return es, nil
}
func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error {
func (es *pubEventStore) Publish(ctx context.Context, stream string, event events.Event) error {
values, err := event.Encode()
if err != nil {
return err
@@ -52,7 +50,7 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
}
record := &redis.XAddArgs{
Stream: es.stream,
Stream: eventsPrefix + stream,
MaxLen: events.MaxEventStreamLen,
Approx: true,
Values: map[string]interface{}{"data": string(data)},
+5 -5
View File
@@ -42,10 +42,10 @@ func TestPublish(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err))
_, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", stream, events.UnpublishedEventsCheckInterval)
_, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", events.UnpublishedEventsCheckInterval)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)
publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, events.UnpublishedEventsCheckInterval)
publisher, err := redis.NewPublisher(context.Background(), redisURL, events.UnpublishedEventsCheckInterval)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer publisher.Close()
@@ -125,7 +125,7 @@ func TestPublish(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
event := testEvent{Data: tc.event}
err := publisher.Publish(context.Background(), event)
err := publisher.Publish(context.Background(), stream, event)
switch tc.err {
case nil:
receivedEvent := <-eventsChan
@@ -241,7 +241,7 @@ func TestPubsub(t *testing.T) {
}
func TestUnavailablePublish(t *testing.T) {
publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, time.Second)
publisher, err := redis.NewPublisher(context.Background(), redisURL, time.Second)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
subcriber, err := redis.NewSubscriber(redisURL, logger)
@@ -296,7 +296,7 @@ func generateRandomEvent() testEvent {
func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < numEvents; i++ {
go func() {
err := publisher.Publish(context.Background(), generateRandomEvent())
err := publisher.Publish(context.Background(), stream, generateRandomEvent())
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
}()
}
+2 -2
View File
@@ -22,8 +22,8 @@ func init() {
log.Println("The binary was build using nats as the events store")
}
func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
pb, err := nats.NewPublisher(ctx, url, stream)
func NewPublisher(ctx context.Context, url string) (events.Publisher, error) {
pb, err := nats.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
+2 -2
View File
@@ -22,8 +22,8 @@ func init() {
log.Println("The binary was build using rabbitmq as the events store")
}
func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
pb, err := rabbitmq.NewPublisher(ctx, url, stream)
func NewPublisher(ctx context.Context, url string) (events.Publisher, error) {
pb, err := rabbitmq.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
+2 -2
View File
@@ -22,8 +22,8 @@ func init() {
log.Println("The binary was build using redis as the events store")
}
func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) {
pb, err := redis.NewPublisher(ctx, url, stream, events.UnpublishedEventsCheckInterval)
func NewPublisher(ctx context.Context, url string) (events.Publisher, error) {
pb, err := redis.NewPublisher(ctx, url, events.UnpublishedEventsCheckInterval)
if err != nil {
return nil, err
}
+4 -3
View File
@@ -16,11 +16,12 @@ import (
)
const (
stream = "events.supermq.groups"
stream = "events.supermq.group.*"
create = "group.create"
update = "group.update"
changeStatus = "group.change_status"
enable = "group.enable"
disable = "group.disable"
remove = "group.remove"
addParentGroup = "group.add_parent_group"
removeParentGroup = "group.remove_parent_group"
@@ -87,7 +88,7 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
return es.createGroupHandler(ctx, msg)
case update:
return es.updateGroupHandler(ctx, msg)
case changeStatus:
case enable, disable:
return es.changeStatusGroupHandler(ctx, msg)
case remove:
return es.removeGroupHandler(ctx, msg)
+2 -2
View File
@@ -19,7 +19,7 @@ type publisherES struct {
}
func NewPublisherMiddleware(ctx context.Context, pub messaging.Publisher, url string) (messaging.Publisher, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -41,7 +41,7 @@ func (es *publisherES) Publish(ctx context.Context, topic string, msg *messaging
subtopic: msg.Subtopic,
}
return es.ep.Publish(ctx, me)
return es.ep.Publish(ctx, publishStream, me)
}
func (es *publisherES) Close() error {
+10 -5
View File
@@ -11,7 +11,12 @@ import (
"github.com/absmach/supermq/pkg/messaging"
)
const streamID = "supermq.messaging"
const (
supermqPrefix = "supermq."
publishStream = supermqPrefix + "publish"
subscribeStream = supermqPrefix + "subscribe"
unsubscribeStream = supermqPrefix + "unsubscribe"
)
var _ messaging.PubSub = (*pubsubES)(nil)
@@ -21,7 +26,7 @@ type pubsubES struct {
}
func NewPubSubMiddleware(ctx context.Context, pubsub messaging.PubSub, url string) (messaging.PubSub, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -43,7 +48,7 @@ func (es *pubsubES) Publish(ctx context.Context, topic string, msg *messaging.Me
subtopic: msg.Subtopic,
}
return es.ep.Publish(ctx, me)
return es.ep.Publish(ctx, publishStream, me)
}
func (es *pubsubES) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error {
@@ -58,7 +63,7 @@ func (es *pubsubES) Subscribe(ctx context.Context, cfg messaging.SubscriberConfi
topic: cfg.Topic,
}
return es.ep.Publish(ctx, se)
return es.ep.Publish(ctx, subscribeStream, se)
}
func (es *pubsubES) Unsubscribe(ctx context.Context, id string, topic string) error {
@@ -72,7 +77,7 @@ func (es *pubsubES) Unsubscribe(ctx context.Context, id string, topic string) er
topic: topic,
}
return es.ep.Publish(ctx, se)
return es.ep.Publish(ctx, unsubscribeStream, se)
}
func (es *pubsubES) Close() error {
@@ -185,6 +185,10 @@ func (es *EventHandler) AddEntityRoleMembersHandler(ctx context.Context, data ma
if !ok {
return fmt.Errorf(errAddEntityRoleMembersEvent, es.entityType, errRoleID)
}
entityID, ok := data["entity_id"].(string)
if !ok {
return fmt.Errorf(errRemoveEntityRoleAllMembersEvent, es.entityType, errEntityID)
}
imems, ok := data["members"].([]interface{})
if !ok {
return fmt.Errorf(errAddEntityRoleMembersEvent, es.entityType, errMembers)
@@ -194,7 +198,7 @@ func (es *EventHandler) AddEntityRoleMembersHandler(ctx context.Context, data ma
return fmt.Errorf(errAddEntityRoleMembersEvent, es.entityType, err)
}
if _, err := es.repo.RoleAddMembers(ctx, roles.Role{ID: id}, mems); err != nil {
if _, err := es.repo.RoleAddMembers(ctx, roles.Role{ID: id, EntityID: entityID}, mems); err != nil {
return fmt.Errorf(errAddEntityRoleMembersEvent, es.entityType, err)
}
+26 -19
View File
@@ -11,6 +11,11 @@ import (
"github.com/absmach/supermq/pkg/roles"
)
const (
supermqPrefix = "supermq."
rolesPrefix = "roles"
)
var _ roles.RoleManager = (*RoleManagerEventStore)(nil)
type RoleManagerEventStore struct {
@@ -18,6 +23,7 @@ type RoleManagerEventStore struct {
svc roles.RoleManager
operationPrefix string
svcName string
streamID string
}
// NewEventStoreMiddleware returns wrapper around auth service that sends
@@ -27,6 +33,7 @@ func NewRoleManagerEventStore(svcName, operationPrefix string, svc roles.RoleMan
svcName: svcName,
operationPrefix: operationPrefix,
svc: svc,
streamID: supermqPrefix + operationPrefix + rolesPrefix,
Publisher: publisher,
}
}
@@ -41,7 +48,7 @@ func (rmes *RoleManagerEventStore) AddRole(ctx context.Context, session authn.Se
operationPrefix: rmes.operationPrefix,
RoleProvision: nrp,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return nrp, err
}
return nrp, nil
@@ -56,7 +63,7 @@ func (rmes *RoleManagerEventStore) RemoveRole(ctx context.Context, session authn
roleID: roleID,
entityID: entityID,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return err
}
return nil
@@ -72,7 +79,7 @@ func (rmes *RoleManagerEventStore) UpdateRoleName(ctx context.Context, session a
operationPrefix: rmes.operationPrefix,
Role: ro,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return ro, err
}
return ro, nil
@@ -87,7 +94,7 @@ func (rmes *RoleManagerEventStore) RetrieveRole(ctx context.Context, session aut
operationPrefix: rmes.operationPrefix,
Role: ro,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return ro, err
}
return ro, nil
@@ -105,7 +112,7 @@ func (rmes *RoleManagerEventStore) RetrieveAllRoles(ctx context.Context, session
limit: limit,
offset: offset,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return rp, err
}
return rp, nil
@@ -119,7 +126,7 @@ func (rmes *RoleManagerEventStore) ListAvailableActions(ctx context.Context, ses
e := listAvailableActionsEvent{
operationPrefix: rmes.operationPrefix,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return actions, err
}
return actions, nil
@@ -136,7 +143,7 @@ func (rmes *RoleManagerEventStore) RoleAddActions(ctx context.Context, session a
roleID: roleID,
actions: actions,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return actions, err
}
return actions, nil
@@ -153,7 +160,7 @@ func (rmes *RoleManagerEventStore) RoleListActions(ctx context.Context, session
entityID: entityID,
roleID: roleID,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return actions, err
}
return actions, nil
@@ -172,7 +179,7 @@ func (rmes *RoleManagerEventStore) RoleCheckActionsExists(ctx context.Context, s
actions: actions,
isAllExists: isAllExists,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return isAllExists, err
}
return isAllExists, nil
@@ -189,7 +196,7 @@ func (rmes *RoleManagerEventStore) RoleRemoveActions(ctx context.Context, sessio
roleID: roleID,
actions: actions,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return err
}
return nil
@@ -205,7 +212,7 @@ func (rmes *RoleManagerEventStore) RoleRemoveAllActions(ctx context.Context, ses
entityID: entityID,
roleID: roleID,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return err
}
return nil
@@ -223,7 +230,7 @@ func (rmes *RoleManagerEventStore) RoleAddMembers(ctx context.Context, session a
roleID: roleID,
members: members,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return mems, err
}
return mems, nil
@@ -242,7 +249,7 @@ func (rmes *RoleManagerEventStore) RoleListMembers(ctx context.Context, session
limit: limit,
offset: offset,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return mp, err
}
return mp, nil
@@ -260,7 +267,7 @@ func (rmes *RoleManagerEventStore) RoleCheckMembersExists(ctx context.Context, s
roleID: roleID,
members: members,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return isAllExists, err
}
return isAllExists, nil
@@ -277,7 +284,7 @@ func (rmes *RoleManagerEventStore) RoleRemoveMembers(ctx context.Context, sessio
roleID: roleID,
members: members,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return err
}
return nil
@@ -293,7 +300,7 @@ func (rmes *RoleManagerEventStore) RoleRemoveAllMembers(ctx context.Context, ses
entityID: entityID,
roleID: roleID,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return err
}
return nil
@@ -311,7 +318,7 @@ func (rmes *RoleManagerEventStore) ListEntityMembers(ctx context.Context, sessio
limit: pageQuery.Limit,
offset: pageQuery.Offset,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return mems, err
}
return mems, nil
@@ -327,7 +334,7 @@ func (rmes *RoleManagerEventStore) RemoveEntityMembers(ctx context.Context, sess
entityID: entityID,
members: members,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return err
}
return nil
@@ -342,7 +349,7 @@ func (rmes *RoleManagerEventStore) RemoveMemberFromAllRoles(ctx context.Context,
operationPrefix: rmes.operationPrefix,
memberID: memberID,
}
if err := rmes.Publish(ctx, e); err != nil {
if err := rmes.Publish(ctx, rmes.streamID, e); err != nil {
return err
}
return nil
+14 -11
View File
@@ -15,7 +15,14 @@ const (
userPrefix = "user."
userCreate = userPrefix + "create"
userUpdate = userPrefix + "update"
userRemove = userPrefix + "remove"
userUpdateRole = userPrefix + "update_role"
userUpdateTags = userPrefix + "update_tags"
userUpdateSecret = userPrefix + "update_secret"
userUpdateUsername = userPrefix + "update_username"
userUpdateProfilePicture = userPrefix + "update_profile_picture"
userUpdateEmail = userPrefix + "update_email"
userEnable = userPrefix + "enable"
userDisable = userPrefix + "disable"
userView = userPrefix + "view"
profileView = userPrefix + "view_profile"
userList = userPrefix + "list"
@@ -30,8 +37,6 @@ const (
oauthCallback = userPrefix + "oauth_callback"
addClientPolicy = userPrefix + "add_policy"
deleteUser = userPrefix + "delete"
userUpdateUsername = userPrefix + "update_username"
userUpdateProfilePicture = userPrefix + "update_profile_picture"
)
var (
@@ -39,7 +44,7 @@ var (
_ events.Event = (*updateUserEvent)(nil)
_ events.Event = (*updateProfilePictureEvent)(nil)
_ events.Event = (*updateUsernameEvent)(nil)
_ events.Event = (*removeUserEvent)(nil)
_ events.Event = (*changeUserStatusEvent)(nil)
_ events.Event = (*viewUserEvent)(nil)
_ events.Event = (*viewProfileEvent)(nil)
_ events.Event = (*listUserEvent)(nil)
@@ -103,16 +108,13 @@ type updateUserEvent struct {
func (uce updateUserEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": userUpdate,
"operation": uce.operation,
"updated_at": uce.UpdatedAt,
"updated_by": uce.UpdatedBy,
"token_type": uce.Type.String(),
"super_admin": uce.SuperAdmin,
"request_id": uce.requestID,
}
if uce.operation != "" {
val["operation"] = userUpdate + "_" + uce.operation
}
if uce.ID != "" {
val["id"] = uce.ID
@@ -203,8 +205,9 @@ func (uppe updateProfilePictureEvent) Encode() (map[string]interface{}, error) {
return val, nil
}
type removeUserEvent struct {
type changeUserStatusEvent struct {
id string
operation string
status string
updatedAt time.Time
updatedBy string
@@ -212,9 +215,9 @@ type removeUserEvent struct {
requestID string
}
func (rce removeUserEvent) Encode() (map[string]interface{}, error) {
func (rce changeUserStatusEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"operation": userRemove,
"operation": rce.operation,
"id": rce.id,
"status": rce.status,
"updated_at": rce.updatedAt,
+57 -30
View File
@@ -14,7 +14,33 @@ import (
"github.com/go-chi/chi/v5/middleware"
)
const streamID = "supermq.users"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + userCreate
updateStream = supermqPrefix + userUpdate
updateRoleStream = supermqPrefix + userUpdateRole
updateTagsStream = supermqPrefix + userUpdateTags
updateSecretStream = supermqPrefix + userUpdateSecret
updateUsernameStream = supermqPrefix + userUpdateUsername
updatePictureStream = supermqPrefix + userUpdateProfilePicture
UpdateEmailStream = supermqPrefix + userUpdateEmail
enableStream = supermqPrefix + userEnable
disableStream = supermqPrefix + userDisable
viewStream = supermqPrefix + userView
viewProfileStream = supermqPrefix + profileView
listStream = supermqPrefix + userList
searchStream = supermqPrefix + userSearch
listByGroupStream = supermqPrefix + userListByGroup
identifyStream = supermqPrefix + userIdentify
resetTokenStream = supermqPrefix + generateResetToken
issueTokenStream = supermqPrefix + issueToken
refreshTokenStream = supermqPrefix + refreshToken
resetSecretStream = supermqPrefix + resetSecret
sendPasswordResetStream = supermqPrefix + sendPasswordReset
oauthStream = supermqPrefix + oauthCallback
addPolicyStream = supermqPrefix + addClientPolicy
deleteStream = supermqPrefix + deleteUser
)
var _ users.Service = (*eventStore)(nil)
@@ -26,7 +52,7 @@ type eventStore struct {
// NewEventStoreMiddleware returns wrapper around users service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc users.Service, url string) (users.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)
if err != nil {
return nil, err
}
@@ -49,7 +75,7 @@ func (es *eventStore) Register(ctx context.Context, session authn.Session, user
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {
return user, err
}
@@ -62,7 +88,7 @@ func (es *eventStore) Update(ctx context.Context, session authn.Session, user us
return user, err
}
return es.update(ctx, session, "", user)
return es.update(ctx, session, userUpdate, updateStream, user)
}
func (es *eventStore) UpdateRole(ctx context.Context, session authn.Session, user users.User) (users.User, error) {
@@ -71,7 +97,7 @@ func (es *eventStore) UpdateRole(ctx context.Context, session authn.Session, use
return user, err
}
return es.update(ctx, session, "role", user)
return es.update(ctx, session, userUpdateRole, updateRoleStream, user)
}
func (es *eventStore) UpdateTags(ctx context.Context, session authn.Session, user users.User) (users.User, error) {
@@ -80,7 +106,7 @@ func (es *eventStore) UpdateTags(ctx context.Context, session authn.Session, use
return user, err
}
return es.update(ctx, session, "tags", user)
return es.update(ctx, session, userUpdateTags, updateTagsStream, user)
}
func (es *eventStore) UpdateSecret(ctx context.Context, session authn.Session, oldSecret, newSecret string) (users.User, error) {
@@ -89,7 +115,7 @@ func (es *eventStore) UpdateSecret(ctx context.Context, session authn.Session, o
return user, err
}
return es.update(ctx, session, "secret", user)
return es.update(ctx, session, userUpdateSecret, updateSecretStream, user)
}
func (es *eventStore) UpdateUsername(ctx context.Context, session authn.Session, id, username string) (users.User, error) {
@@ -104,7 +130,7 @@ func (es *eventStore) UpdateUsername(ctx context.Context, session authn.Session,
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updateUsernameStream, event); err != nil {
return user, err
}
@@ -123,7 +149,7 @@ func (es *eventStore) UpdateProfilePicture(ctx context.Context, session authn.Se
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updatePictureStream, event); err != nil {
return user, err
}
@@ -136,15 +162,15 @@ func (es *eventStore) UpdateEmail(ctx context.Context, session authn.Session, id
return user, err
}
return es.update(ctx, session, "email", user)
return es.update(ctx, session, userUpdateEmail, UpdateEmailStream, user)
}
func (es *eventStore) update(ctx context.Context, session authn.Session, operation string, user users.User) (users.User, error) {
func (es *eventStore) update(ctx context.Context, session authn.Session, operation, stream string, user users.User) (users.User, error) {
event := updateUserEvent{
user, operation, session, middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, stream, event); err != nil {
return user, err
}
@@ -163,7 +189,7 @@ func (es *eventStore) View(ctx context.Context, session authn.Session, id string
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, viewStream, event); err != nil {
return user, err
}
@@ -182,7 +208,7 @@ func (es *eventStore) ViewProfile(ctx context.Context, session authn.Session) (u
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, viewProfileStream, event); err != nil {
return user, err
}
@@ -200,7 +226,7 @@ func (es *eventStore) ListUsers(ctx context.Context, session authn.Session, pm u
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {
return cp, err
}
@@ -217,7 +243,7 @@ func (es *eventStore) SearchUsers(ctx context.Context, pm users.Page) (users.Use
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, searchStream, event); err != nil {
return cp, err
}
@@ -230,7 +256,7 @@ func (es *eventStore) Enable(ctx context.Context, session authn.Session, id stri
return user, err
}
return es.delete(ctx, session, user)
return es.changeStatus(ctx, session, userEnable, enableStream, user)
}
func (es *eventStore) Disable(ctx context.Context, session authn.Session, id string) (users.User, error) {
@@ -239,12 +265,13 @@ func (es *eventStore) Disable(ctx context.Context, session authn.Session, id str
return user, err
}
return es.delete(ctx, session, user)
return es.changeStatus(ctx, session, userDisable, disableStream, user)
}
func (es *eventStore) delete(ctx context.Context, session authn.Session, user users.User) (users.User, error) {
event := removeUserEvent{
func (es *eventStore) changeStatus(ctx context.Context, session authn.Session, operation, stream string, user users.User) (users.User, error) {
event := changeUserStatusEvent{
id: user.ID,
operation: operation,
updatedAt: user.UpdatedAt,
updatedBy: user.UpdatedBy,
status: user.Status.String(),
@@ -252,7 +279,7 @@ func (es *eventStore) delete(ctx context.Context, session authn.Session, user us
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, stream, event); err != nil {
return user, err
}
@@ -270,7 +297,7 @@ func (es *eventStore) Identify(ctx context.Context, session authn.Session) (stri
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, identifyStream, event); err != nil {
return userID, err
}
@@ -289,7 +316,7 @@ func (es *eventStore) GenerateResetToken(ctx context.Context, email, host string
requestID: middleware.GetReqID(ctx),
}
return es.Publish(ctx, event)
return es.Publish(ctx, resetTokenStream, event)
}
func (es *eventStore) IssueToken(ctx context.Context, username, secret string) (*grpcTokenV1.Token, error) {
@@ -303,7 +330,7 @@ func (es *eventStore) IssueToken(ctx context.Context, username, secret string) (
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, issueTokenStream, event); err != nil {
return token, err
}
@@ -320,7 +347,7 @@ func (es *eventStore) RefreshToken(ctx context.Context, session authn.Session, r
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, refreshTokenStream, event); err != nil {
return token, err
}
@@ -336,7 +363,7 @@ func (es *eventStore) ResetSecret(ctx context.Context, session authn.Session, se
requestID: middleware.GetReqID(ctx),
}
return es.Publish(ctx, event)
return es.Publish(ctx, resetSecretStream, event)
}
func (es *eventStore) SendPasswordReset(ctx context.Context, host, email, user, token string) error {
@@ -351,7 +378,7 @@ func (es *eventStore) SendPasswordReset(ctx context.Context, host, email, user,
requestID: middleware.GetReqID(ctx),
}
return es.Publish(ctx, event)
return es.Publish(ctx, sendPasswordResetStream, event)
}
func (es *eventStore) OAuthCallback(ctx context.Context, user users.User) (users.User, error) {
@@ -365,7 +392,7 @@ func (es *eventStore) OAuthCallback(ctx context.Context, user users.User) (users
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, oauthStream, event); err != nil {
return token, err
}
@@ -383,7 +410,7 @@ func (es *eventStore) Delete(ctx context.Context, session authn.Session, id stri
requestID: middleware.GetReqID(ctx),
}
return es.Publish(ctx, event)
return es.Publish(ctx, deleteStream, event)
}
func (es *eventStore) OAuthAddUserPolicy(ctx context.Context, user users.User) error {
@@ -397,5 +424,5 @@ func (es *eventStore) OAuthAddUserPolicy(ctx context.Context, user users.User) e
requestID: middleware.GetReqID(ctx),
}
return es.Publish(ctx, event)
return es.Publish(ctx, addPolicyStream, event)
}