SMQ-2944 - Add client events consumer (#2947)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-09-16 18:21:39 +03:00
committed by GitHub
parent 8d9e92f228
commit cd9cb9e1d5
4 changed files with 419 additions and 0 deletions
+220
View File
@@ -0,0 +1,220 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"time"
"github.com/absmach/supermq/clients"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/roles"
rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer"
)
const layout = "2006-01-02T15:04:05.999999Z"
var (
errDecodeCreateClientEvent = errors.New("failed to decode client create event")
errDecodeUpdateClientEvent = errors.New("failed to decode client update event")
errDecodeChangeStatusClientEvent = errors.New("failed to decode client change status event")
errDecodeRemoveClientEvent = errors.New("failed to decode client remove event")
errDecodeSetParentGroupEvent = errors.New("failed to decode client set parent event")
errDecodeRemoveParentGroupEvent = errors.New("failed to decode client remove parent event")
errID = errors.New("missing or invalid 'id'")
errDomain = errors.New("missing or invalid 'domain'")
errStatus = errors.New("missing or invalid 'status'")
errTags = errors.New("invalid 'tags'")
errConvertStatus = errors.New("failed to convert status")
errCreatedAt = errors.New("failed to parse 'created_at' time")
errUpdatedAt = errors.New("failed to parse 'updated_at' time")
)
func ToClient(data map[string]any) (clients.Client, error) {
var c clients.Client
id, ok := data["id"].(string)
if !ok {
return clients.Client{}, errID
}
c.ID = id
dom, ok := data["domain"].(string)
if !ok {
return clients.Client{}, errDomain
}
c.Domain = dom
st, ok := data["status"].(string)
if !ok {
return clients.Client{}, errStatus
}
status, err := clients.ToStatus(st)
if err != nil {
return clients.Client{}, errConvertStatus
}
c.Status = status
cat, ok := data["created_at"].(string)
if !ok {
return clients.Client{}, errCreatedAt
}
ct, err := time.Parse(layout, cat)
if err != nil {
return clients.Client{}, errors.Wrap(errCreatedAt, err)
}
c.CreatedAt = ct
// Following fields of clients are allowed to be empty.
name, ok := data["name"].(string)
if ok {
c.Name = name
}
identity, ok := data["identity"].(string)
if ok {
c.Identity = identity
}
parent, ok := data["parent_group_id"].(string)
if ok {
c.ParentGroup = parent
}
itags, ok := data["tags"].([]any)
if ok {
tags, err := rconsumer.ToStrings(itags)
if err != nil {
return clients.Client{}, errors.Wrap(errTags, err)
}
c.Tags = tags
}
meta, ok := data["metadata"].(map[string]any)
if ok {
c.Metadata = meta
}
uby, ok := data["updated_by"].(string)
if ok {
c.UpdatedBy = uby
}
uat, ok := data["updated_at"].(string)
if ok {
ut, err := time.Parse(layout, uat)
if err != nil {
return clients.Client{}, errors.Wrap(errUpdatedAt, err)
}
c.UpdatedAt = ut
}
return c, nil
}
func decodeCreateClientEvent(data map[string]any) (clients.Client, []roles.RoleProvision, error) {
c, err := ToClient(data)
if err != nil {
return clients.Client{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateClientEvent, err)
}
irps, ok := data["roles_provisioned"].([]any)
if !ok {
return clients.Client{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateClientEvent, errors.New("missing or invalid 'roles_provisioned'"))
}
rps, err := rconsumer.ToRoleProvisions(irps)
if err != nil {
return clients.Client{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateClientEvent, err)
}
return c, rps, nil
}
func decodeUpdateClientEvent(data map[string]any) (clients.Client, error) {
c, err := ToClient(data)
if err != nil {
return clients.Client{}, errors.Wrap(errDecodeUpdateClientEvent, err)
}
return c, nil
}
func decodeChangeStatusClientEvent(data map[string]any) (clients.Client, error) {
c, err := ToClientStatus(data)
if err != nil {
return clients.Client{}, errors.Wrap(errDecodeChangeStatusClientEvent, err)
}
return c, nil
}
func ToClientStatus(data map[string]any) (clients.Client, error) {
var c clients.Client
id, ok := data["id"].(string)
if !ok {
return clients.Client{}, errID
}
c.ID = id
stat, ok := data["status"].(string)
if !ok {
return clients.Client{}, errStatus
}
st, err := clients.ToStatus(stat)
if err != nil {
return clients.Client{}, errors.Wrap(errConvertStatus, err)
}
c.Status = st
uat, ok := data["updated_at"].(string)
if ok {
ut, err := time.Parse(layout, uat)
if err != nil {
return clients.Client{}, errors.Wrap(errUpdatedAt, err)
}
c.UpdatedAt = ut
}
uby, ok := data["updated_by"].(string)
if ok {
c.UpdatedBy = uby
}
return c, nil
}
func decodeRemoveClientEvent(data map[string]any) (clients.Client, error) {
var c clients.Client
id, ok := data["id"].(string)
if !ok {
return clients.Client{}, errors.Wrap(errDecodeRemoveClientEvent, errID)
}
c.ID = id
return c, nil
}
func decodeSetParentGroupEvent(data map[string]any) (clients.Client, error) {
id, ok := data["id"].(string)
if !ok {
return clients.Client{}, errors.Wrap(errDecodeSetParentGroupEvent, errID)
}
parent, ok := data["parent_group_id"].(string)
if !ok {
return clients.Client{}, errors.Wrap(errDecodeSetParentGroupEvent, errID)
}
return clients.Client{
ID: id,
ParentGroup: parent,
}, nil
}
func decodeRemoveParentGroupEvent(data map[string]any) (clients.Client, error) {
id, ok := data["id"].(string)
if !ok {
return clients.Client{}, errors.Wrap(errDecodeRemoveParentGroupEvent, errID)
}
return clients.Client{
ID: id,
}, nil
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package consumer contains events consumer for events
// published by clients service.
package consumer
+187
View File
@@ -0,0 +1,187 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"context"
"log/slog"
"github.com/absmach/supermq/clients"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/events"
"github.com/absmach/supermq/pkg/events/store"
rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer"
)
const (
stream = "events.supermq.client.*"
create = "client.create"
update = "client.update"
updateTags = "client.update_tags"
enable = "client.enable"
disable = "client.disable"
remove = "client.remove"
setParentGroup = "client.set_parent"
removeParentGroup = "client.remove_parent"
)
var (
errNoOperationKey = errors.New("operation key is not found in event message")
errCreateClientEvent = errors.New("failed to consume client create event")
errUpdateClientEvent = errors.New("failed to consume client update event")
errChangeStatusClientEvent = errors.New("failed to consume client change status event")
errRemoveClientEvent = errors.New("failed to consume client remove event")
errSetParentGroupEvent = errors.New("failed to consume client add parent group event")
errRemoveParentGroupEvent = errors.New("failed to consume client remove parent group event")
)
type eventHandler struct {
repo clients.Repository
rolesEventHandler rconsumer.EventHandler
}
func ClientsEventsSubscribe(ctx context.Context, repo clients.Repository, esURL, esConsumerName string, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, esURL, logger)
if err != nil {
return err
}
subConfig := events.SubscriberConfig{
Stream: stream,
Consumer: esConsumerName,
Handler: NewEventHandler(repo),
Ordered: true,
}
return subscriber.Subscribe(ctx, subConfig)
}
// NewEventHandler returns new event store handler.
func NewEventHandler(repo clients.Repository) events.EventHandler {
reh := rconsumer.NewEventHandler("client", repo)
return &eventHandler{
repo: repo,
rolesEventHandler: reh,
}
}
func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
msg, err := event.Encode()
if err != nil {
return err
}
op, ok := msg["operation"]
if !ok {
return errNoOperationKey
}
switch op {
case create:
return es.createClientHandler(ctx, msg)
case update:
return es.updateClientHandler(ctx, msg)
case updateTags:
return es.updateClientTagsHandler(ctx, msg)
case enable, disable:
return es.changeStatusClientHandler(ctx, msg)
case remove:
return es.removeClientHandler(ctx, msg)
case setParentGroup:
return es.setParentGroupHandler(ctx, msg)
case removeParentGroup:
return es.removeParentGroupHandler(ctx, msg)
}
return es.rolesEventHandler.Handle(ctx, op, msg)
}
func (es *eventHandler) createClientHandler(ctx context.Context, data map[string]any) error {
c, rps, err := decodeCreateClientEvent(data)
if err != nil {
return errors.Wrap(errCreateClientEvent, err)
}
if _, err := es.repo.Save(ctx, c); err != nil {
return errors.Wrap(errCreateClientEvent, err)
}
if _, err := es.repo.AddRoles(ctx, rps); err != nil {
return errors.Wrap(errCreateClientEvent, err)
}
return nil
}
func (es *eventHandler) updateClientHandler(ctx context.Context, data map[string]any) error {
c, err := decodeUpdateClientEvent(data)
if err != nil {
return errors.Wrap(errUpdateClientEvent, err)
}
if _, err := es.repo.Update(ctx, c); err != nil {
return errors.Wrap(errUpdateClientEvent, err)
}
return nil
}
func (es *eventHandler) updateClientTagsHandler(ctx context.Context, data map[string]any) error {
c, err := decodeUpdateClientEvent(data)
if err != nil {
return errors.Wrap(errUpdateClientEvent, err)
}
if _, err := es.repo.UpdateTags(ctx, c); err != nil {
return errors.Wrap(errUpdateClientEvent, err)
}
return nil
}
func (es *eventHandler) changeStatusClientHandler(ctx context.Context, data map[string]any) error {
c, err := decodeChangeStatusClientEvent(data)
if err != nil {
return errors.Wrap(errChangeStatusClientEvent, err)
}
if _, err := es.repo.ChangeStatus(ctx, c); err != nil {
return errors.Wrap(errChangeStatusClientEvent, err)
}
return nil
}
func (es *eventHandler) removeClientHandler(ctx context.Context, data map[string]any) error {
c, err := decodeRemoveClientEvent(data)
if err != nil {
return errors.Wrap(errRemoveClientEvent, err)
}
if err := es.repo.Delete(ctx, c.ID); err != nil {
return errors.Wrap(errRemoveClientEvent, err)
}
return nil
}
func (es *eventHandler) setParentGroupHandler(ctx context.Context, data map[string]any) error {
c, err := decodeSetParentGroupEvent(data)
if err != nil {
return errors.Wrap(errSetParentGroupEvent, err)
}
if err := es.repo.SetParentGroup(ctx, c); err != nil {
return errors.Wrap(errSetParentGroupEvent, err)
}
return nil
}
func (es *eventHandler) removeParentGroupHandler(ctx context.Context, data map[string]any) error {
c, err := decodeRemoveParentGroupEvent(data)
if err != nil {
return errors.Wrap(errRemoveParentGroupEvent, err)
}
if err := es.repo.RemoveParentGroup(ctx, c); err != nil {
return errors.Wrap(errRemoveParentGroupEvent, err)
}
return nil
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package events provides the events sourcing of clients to
// provide replication in other service and definitions needed to support it
package events