SMQ-2945 - Add channels events consumer (#2951)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-09-16 18:23:01 +03:00
committed by GitHub
parent cd9cb9e1d5
commit 1f48a73677
4 changed files with 526 additions and 0 deletions
+297
View File
@@ -0,0 +1,297 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"time"
"github.com/absmach/supermq/channels"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/roles"
rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer"
)
const layout = "2006-01-02T15:04:05.999999Z"
var (
errDecodeCreateChannelEvent = errors.New("failed to decode channel create event")
errDecodeUpdateChannelEvent = errors.New("failed to decode channel update event")
errDecodeChangeStatusChannelEvent = errors.New("failed to decode channel change status event")
errDecodeRemoveChannelEvent = errors.New("failed to decode channel remove event")
errDecodeSetParentGroupEvent = errors.New("failed to decode channel set parent event")
errDecodeRemoveParentGroupEvent = errors.New("failed to decode channel remove parent event")
errDecodeConnectEvent = errors.New("failed to decode channel connect event")
errDeocodeDisconnectEvent = errors.New("failed to decode channel disconnect event")
errID = errors.New("missing or invalid 'id'")
errDomain = errors.New("missing or invalid 'domain'")
errStatus = errors.New("missing or invalid 'status'")
errTags = errors.New("invalid 'tags'")
errConvertStatus = errors.New("failed to convert status")
errChannelIDs = errors.New("missing or invalid 'channel_ids' in connection")
errClientIDs = errors.New("missing or invalid 'client_ids' in connection")
errConnType = errors.New("missing or invalid 'type' in connection")
errCreatedAt = errors.New("failed to parse 'created_at' time")
errUpdatedAt = errors.New("failed to parse 'updated_at' time")
)
func ToChannel(data map[string]any) (channels.Channel, error) {
var c channels.Channel
id, ok := data["id"].(string)
if !ok {
return channels.Channel{}, errID
}
c.ID = id
dom, ok := data["domain"].(string)
if !ok {
return channels.Channel{}, errDomain
}
c.Domain = dom
st, ok := data["status"].(string)
if !ok {
return channels.Channel{}, errStatus
}
status, err := channels.ToStatus(st)
if err != nil {
return channels.Channel{}, errConvertStatus
}
c.Status = status
cat, ok := data["created_at"].(string)
if !ok {
return channels.Channel{}, errCreatedAt
}
ct, err := time.Parse(layout, cat)
if err != nil {
return channels.Channel{}, errors.Wrap(errCreatedAt, err)
}
c.CreatedAt = ct
// Following fields of channels are allowed to be empty.
name, ok := data["name"].(string)
if ok {
c.Name = name
}
parent, ok := data["parent_group_id"].(string)
if ok {
c.ParentGroup = parent
}
itags, ok := data["tags"].([]any)
if ok {
tags, err := rconsumer.ToStrings(itags)
if err != nil {
return channels.Channel{}, errors.Wrap(errTags, err)
}
c.Tags = tags
}
meta, ok := data["metadata"].(map[string]any)
if ok {
c.Metadata = meta
}
uby, ok := data["updated_by"].(string)
if ok {
c.UpdatedBy = uby
}
uat, ok := data["updated_at"].(string)
if ok {
ut, err := time.Parse(layout, uat)
if err != nil {
return channels.Channel{}, errors.Wrap(errUpdatedAt, err)
}
c.UpdatedAt = ut
}
return c, nil
}
func ToConnections(data map[string]any) ([]channels.Connection, error) {
var connTypes []connections.ConnType
domain, ok := data["domain"].(string)
if !ok {
return nil, errDomain
}
ityp, ok := data["types"].([]any)
if !ok {
return nil, errConnType
}
typs, err := rconsumer.ToStrings(ityp)
if err != nil {
return nil, errors.Wrap(errConnType, err)
}
for _, typ := range typs {
connType, err := connections.ParseConnType(typ)
if err != nil {
return nil, errors.Wrap(errConnType, err)
}
connTypes = append(connTypes, connType)
}
ichanIDs, ok := data["channel_ids"].([]any)
if !ok {
return []channels.Connection{}, errChannelIDs
}
channelIDs, err := rconsumer.ToStrings(ichanIDs)
if err != nil {
return []channels.Connection{}, errors.Wrap(errChannelIDs, err)
}
iclIDs, ok := data["client_ids"].([]any)
if !ok {
return []channels.Connection{}, errClientIDs
}
clientIDs, err := rconsumer.ToStrings(iclIDs)
if err != nil {
return []channels.Connection{}, errors.Wrap(errClientIDs, err)
}
var conns []channels.Connection
for _, chanID := range channelIDs {
for _, clientID := range clientIDs {
for _, connType := range connTypes {
conns = append(conns, channels.Connection{
ChannelID: chanID,
ClientID: clientID,
Type: connType,
DomainID: domain,
})
}
}
}
return conns, nil
}
func decodeCreateChannelEvent(data map[string]any) (channels.Channel, []roles.RoleProvision, error) {
c, err := ToChannel(data)
if err != nil {
return channels.Channel{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateChannelEvent, err)
}
irps, ok := data["roles_provisioned"].([]any)
if !ok {
return channels.Channel{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateChannelEvent, errors.New("missing or invalid 'roles_provisioned'"))
}
rps, err := rconsumer.ToRoleProvisions(irps)
if err != nil {
return channels.Channel{}, []roles.RoleProvision{}, errors.Wrap(errDecodeCreateChannelEvent, err)
}
return c, rps, nil
}
func decodeUpdateChannelEvent(data map[string]any) (channels.Channel, error) {
c, err := ToChannel(data)
if err != nil {
return channels.Channel{}, errors.Wrap(errDecodeUpdateChannelEvent, err)
}
return c, nil
}
func decodeChangeStatusChannelEvent(data map[string]any) (channels.Channel, error) {
c, err := ToChannelStatus(data)
if err != nil {
return channels.Channel{}, errors.Wrap(errDecodeChangeStatusChannelEvent, err)
}
return c, nil
}
func ToChannelStatus(data map[string]any) (channels.Channel, error) {
var c channels.Channel
id, ok := data["id"].(string)
if !ok {
return channels.Channel{}, errID
}
c.ID = id
stat, ok := data["status"].(string)
if !ok {
return channels.Channel{}, errStatus
}
st, err := channels.ToStatus(stat)
if err != nil {
return channels.Channel{}, errors.Wrap(errConvertStatus, err)
}
c.Status = st
uat, ok := data["updated_at"].(string)
if ok {
ut, err := time.Parse(layout, uat)
if err != nil {
return channels.Channel{}, errors.Wrap(errUpdatedAt, err)
}
c.UpdatedAt = ut
}
uby, ok := data["updated_by"].(string)
if ok {
c.UpdatedBy = uby
}
return c, nil
}
func decodeRemoveChannelEvent(data map[string]any) (channels.Channel, error) {
var c channels.Channel
id, ok := data["id"].(string)
if !ok {
return channels.Channel{}, errors.Wrap(errDecodeRemoveChannelEvent, errID)
}
c.ID = id
return c, nil
}
func decodeConnectEvent(data map[string]any) ([]channels.Connection, error) {
conns, err := ToConnections(data)
if err != nil {
return []channels.Connection{}, errors.Wrap(errDecodeConnectEvent, err)
}
return conns, nil
}
func decodeDisconnectEvent(data map[string]any) ([]channels.Connection, error) {
conns, err := ToConnections(data)
if err != nil {
return []channels.Connection{}, errors.Wrap(errDeocodeDisconnectEvent, err)
}
return conns, nil
}
func decodeSetParentGroupEvent(data map[string]any) (channels.Channel, error) {
id, ok := data["id"].(string)
if !ok {
return channels.Channel{}, errors.Wrap(errDecodeSetParentGroupEvent, errID)
}
parent, ok := data["parent_group_id"].(string)
if !ok {
return channels.Channel{}, errors.Wrap(errDecodeSetParentGroupEvent, errID)
}
return channels.Channel{
ID: id,
ParentGroup: parent,
}, nil
}
func decodeRemoveParentGroupEvent(data map[string]any) (channels.Channel, error) {
id, ok := data["id"].(string)
if !ok {
return channels.Channel{}, errors.Wrap(errDecodeRemoveParentGroupEvent, errID)
}
return channels.Channel{
ID: id,
}, nil
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package consumer contains events consumer for events
// published by channels service.
package consumer
+217
View File
@@ -0,0 +1,217 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package consumer
import (
"context"
"log/slog"
"github.com/absmach/supermq/channels"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/events"
"github.com/absmach/supermq/pkg/events/store"
rconsumer "github.com/absmach/supermq/pkg/roles/rolemanager/events/consumer"
)
const (
stream = "events.supermq.channel.*"
create = "channel.create"
update = "channel.update"
updateTags = "channel.update_tags"
enable = "channel.enable"
disable = "channel.disable"
remove = "channel.remove"
connect = "channel.connect"
disconnect = "channel.disconnect"
setParentGroup = "channel.set_parent"
removeParentGroup = "channel.remove_parent"
)
var (
errNoOperationKey = errors.New("operation key is not found in event message")
errCreateChannelEvent = errors.New("failed to consume channel create event")
errUpdateChannelEvent = errors.New("failed to consume channel update event")
errChangeStatusChannelEvent = errors.New("failed to consume channel change status event")
errRemoveChannelEvent = errors.New("failed to consume channel remove event")
errConnectEvent = errors.New("failed to consume channel connect event")
errDisconnectEvent = errors.New("failed to consume channel disconnect event")
errSetParentGroupEvent = errors.New("failed to consume channel add parent group event")
errRemoveParentGroupEvent = errors.New("failed to consume channel remove parent group event")
)
type eventHandler struct {
repo channels.Repository
rolesEventHandler rconsumer.EventHandler
}
func ChannelsEventsSubscribe(ctx context.Context, repo channels.Repository, esURL, esConsumerName string, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, esURL, logger)
if err != nil {
return err
}
subConfig := events.SubscriberConfig{
Stream: stream,
Consumer: esConsumerName,
Handler: NewEventHandler(repo),
Ordered: true,
}
return subscriber.Subscribe(ctx, subConfig)
}
// NewEventHandler returns new event store handler.
func NewEventHandler(repo channels.Repository) events.EventHandler {
reh := rconsumer.NewEventHandler("channel", repo)
return &eventHandler{
repo: repo,
rolesEventHandler: reh,
}
}
func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
msg, err := event.Encode()
if err != nil {
return err
}
op, ok := msg["operation"]
if !ok {
return errNoOperationKey
}
switch op {
case create:
return es.createChannelHandler(ctx, msg)
case update:
return es.updateChannelHandler(ctx, msg)
case updateTags:
return es.updateChannelTagsHandler(ctx, msg)
case enable, disable:
return es.changeStatusChannelHandler(ctx, msg)
case remove:
return es.removeChannelHandler(ctx, msg)
case connect:
return es.connectChannelHandler(ctx, msg)
case disconnect:
return es.disconnectChannelHandler(ctx, msg)
case setParentGroup:
return es.setParentGroupHandler(ctx, msg)
case removeParentGroup:
return es.removeParentGroupHandler(ctx, msg)
}
return es.rolesEventHandler.Handle(ctx, op, msg)
}
func (es *eventHandler) createChannelHandler(ctx context.Context, data map[string]any) error {
c, rps, err := decodeCreateChannelEvent(data)
if err != nil {
return errors.Wrap(errCreateChannelEvent, err)
}
if _, err := es.repo.Save(ctx, c); err != nil {
return errors.Wrap(errCreateChannelEvent, err)
}
if _, err := es.repo.AddRoles(ctx, rps); err != nil {
return errors.Wrap(errCreateChannelEvent, err)
}
return nil
}
func (es *eventHandler) updateChannelHandler(ctx context.Context, data map[string]any) error {
c, err := decodeUpdateChannelEvent(data)
if err != nil {
return errors.Wrap(errUpdateChannelEvent, err)
}
if _, err := es.repo.Update(ctx, c); err != nil {
return errors.Wrap(errUpdateChannelEvent, err)
}
return nil
}
func (es *eventHandler) updateChannelTagsHandler(ctx context.Context, data map[string]any) error {
c, err := decodeUpdateChannelEvent(data)
if err != nil {
return errors.Wrap(errUpdateChannelEvent, err)
}
if _, err := es.repo.UpdateTags(ctx, c); err != nil {
return errors.Wrap(errUpdateChannelEvent, err)
}
return nil
}
func (es *eventHandler) changeStatusChannelHandler(ctx context.Context, data map[string]any) error {
c, err := decodeChangeStatusChannelEvent(data)
if err != nil {
return errors.Wrap(errChangeStatusChannelEvent, err)
}
if _, err := es.repo.ChangeStatus(ctx, c); err != nil {
return errors.Wrap(errChangeStatusChannelEvent, err)
}
return nil
}
func (es *eventHandler) removeChannelHandler(ctx context.Context, data map[string]any) error {
c, err := decodeRemoveChannelEvent(data)
if err != nil {
return errors.Wrap(errRemoveChannelEvent, err)
}
if err := es.repo.Remove(ctx, c.ID); err != nil {
return errors.Wrap(errRemoveChannelEvent, err)
}
return nil
}
func (es *eventHandler) connectChannelHandler(ctx context.Context, data map[string]any) error {
c, err := decodeConnectEvent(data)
if err != nil {
return errors.Wrap(errConnectEvent, err)
}
if err := es.repo.AddConnections(ctx, c); err != nil {
return errors.Wrap(errConnectEvent, err)
}
return nil
}
func (es *eventHandler) disconnectChannelHandler(ctx context.Context, data map[string]any) error {
c, err := decodeDisconnectEvent(data)
if err != nil {
return errors.Wrap(errDisconnectEvent, err)
}
if err := es.repo.RemoveConnections(ctx, c); err != nil {
return errors.Wrap(errDisconnectEvent, err)
}
return nil
}
func (es *eventHandler) setParentGroupHandler(ctx context.Context, data map[string]any) error {
c, err := decodeSetParentGroupEvent(data)
if err != nil {
return errors.Wrap(errSetParentGroupEvent, err)
}
if err := es.repo.SetParentGroup(ctx, c); err != nil {
return errors.Wrap(errSetParentGroupEvent, err)
}
return nil
}
func (es *eventHandler) removeParentGroupHandler(ctx context.Context, data map[string]any) error {
c, err := decodeRemoveParentGroupEvent(data)
if err != nil {
return errors.Wrap(errRemoveParentGroupEvent, err)
}
if err := es.repo.RemoveParentGroup(ctx, c); err != nil {
return errors.Wrap(errRemoveParentGroupEvent, err)
}
return nil
}
+6
View File
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package events provides the events sourcing of channels to
// provide replication in other service and definitions needed to support it
package events