Files
magistrala/bootstrap/events/producer/streams.go
T
Steve Munene 683809dc6b
Property Based Tests / api-test (push) Has been cancelled
Continuous Delivery / lint-and-build (push) Has been cancelled
Deploy GitHub Pages / swagger-ui (push) Has been cancelled
CI Pipeline / Lint Proto (push) Has been cancelled
Continuous Delivery / Build and Push Docker Images (push) Has been cancelled
CI Pipeline / lint-and-build (push) Has been cancelled
CI Pipeline / Test ${{ matrix.module }} (push) Has been cancelled
CI Pipeline / Upload Coverage (push) Has been cancelled
CI Pipeline / Detect Changes (push) Has been cancelled
NOISSUE - Update bootstrap content format, update profile method and add profile search (#3515)
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
2026-05-19 09:02:45 +02:00

285 lines
8.1 KiB
Go

// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package producer
import (
"context"
"github.com/absmach/magistrala/bootstrap"
smqauthn "github.com/absmach/magistrala/pkg/authn"
"github.com/absmach/magistrala/pkg/events"
)
var _ bootstrap.Service = (*eventStore)(nil)
const (
magistralaPrefix = "magistrala."
createStream = magistralaPrefix + configCreate
listStream = magistralaPrefix + configList
removeStream = magistralaPrefix + configRemove
updateCertStream = magistralaPrefix + certUpdate
bootstrapStream = magistralaPrefix + clientBootstrap
enableConfigStream = magistralaPrefix + configEnable
disableConfigStream = magistralaPrefix + configDisable
createProfileStream = magistralaPrefix + profileCreate
viewProfileStream = magistralaPrefix + profileView
updateProfileStream = magistralaPrefix + profileUpdate
listProfilesStream = magistralaPrefix + profileList
deleteProfileStream = magistralaPrefix + profileDelete
assignProfileStream = magistralaPrefix + profileAssign
bindResourcesStream = magistralaPrefix + bindingsBind
listBindingsStream = magistralaPrefix + bindingsList
refreshBindingsStream = magistralaPrefix + bindingsRefresh
)
type eventStore struct {
events.Publisher
svc bootstrap.Service
}
// NewEventStoreMiddleware returns wrapper around bootstrap service that sends
// events to event store.
func NewEventStoreMiddleware(svc bootstrap.Service, publisher events.Publisher) bootstrap.Service {
return &eventStore{
svc: svc,
Publisher: publisher,
}
}
func (es *eventStore) Add(ctx context.Context, session smqauthn.Session, token string, cfg bootstrap.Config) (bootstrap.Config, error) {
saved, err := es.svc.Add(ctx, session, token, cfg)
if err != nil {
return saved, err
}
ev := configEvent{
saved, configCreate,
}
if err := es.Publish(ctx, createStream, ev); err != nil {
return saved, err
}
return saved, err
}
func (es *eventStore) View(ctx context.Context, session smqauthn.Session, id string) (bootstrap.Config, error) {
cfg, err := es.svc.View(ctx, session, id)
if err != nil {
return cfg, err
}
ev := configEvent{
cfg, configView,
}
if err := es.Publish(ctx, magistralaPrefix+configView, ev); err != nil {
return cfg, err
}
return cfg, err
}
func (es *eventStore) Update(ctx context.Context, session smqauthn.Session, cfg bootstrap.Config) error {
if err := es.svc.Update(ctx, session, cfg); err != nil {
return err
}
ev := configEvent{
cfg, configUpdate,
}
return es.Publish(ctx, magistralaPrefix+configUpdate, ev)
}
func (es eventStore) UpdateCert(ctx context.Context, session smqauthn.Session, id, clientCert, clientKey, caCert string) (bootstrap.Config, error) {
cfg, err := es.svc.UpdateCert(ctx, session, id, clientCert, clientKey, caCert)
if err != nil {
return cfg, err
}
ev := updateCertEvent{
configID: id,
clientCert: clientCert,
clientKey: clientKey,
caCert: caCert,
}
if err := es.Publish(ctx, updateCertStream, ev); err != nil {
return cfg, err
}
return cfg, nil
}
func (es *eventStore) List(ctx context.Context, session smqauthn.Session, filter bootstrap.Filter, offset, limit uint64) (bootstrap.ConfigsPage, error) {
bp, err := es.svc.List(ctx, session, filter, offset, limit)
if err != nil {
return bp, err
}
ev := listConfigsEvent{
offset: offset,
limit: limit,
fullMatch: filter.FullMatch,
partialMatch: filter.PartialMatch,
}
if err := es.Publish(ctx, listStream, ev); err != nil {
return bp, err
}
return bp, nil
}
func (es *eventStore) Remove(ctx context.Context, session smqauthn.Session, id string) error {
if err := es.svc.Remove(ctx, session, id); err != nil {
return err
}
ev := removeConfigEvent{
config: id,
}
return es.Publish(ctx, removeStream, ev)
}
func (es *eventStore) Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (bootstrap.Config, error) {
cfg, err := es.svc.Bootstrap(ctx, externalKey, externalID, secure)
ev := bootstrapEvent{
cfg,
externalID,
true,
}
if err != nil {
ev.success = false
}
if err := es.Publish(ctx, bootstrapStream, ev); err != nil {
return cfg, err
}
return cfg, err
}
func (es *eventStore) EnableConfig(ctx context.Context, session smqauthn.Session, id string) (bootstrap.Config, error) {
cfg, err := es.svc.EnableConfig(ctx, session, id)
if err != nil {
return cfg, err
}
ev := enableConfigEvent{configID: id}
if err := es.Publish(ctx, enableConfigStream, ev); err != nil {
return cfg, err
}
return cfg, nil
}
func (es *eventStore) DisableConfig(ctx context.Context, session smqauthn.Session, id string) (bootstrap.Config, error) {
cfg, err := es.svc.DisableConfig(ctx, session, id)
if err != nil {
return cfg, err
}
ev := disableConfigEvent{configID: id}
if err := es.Publish(ctx, disableConfigStream, ev); err != nil {
return cfg, err
}
return cfg, nil
}
func (es *eventStore) CreateProfile(ctx context.Context, session smqauthn.Session, p bootstrap.Profile) (bootstrap.Profile, error) {
saved, err := es.svc.CreateProfile(ctx, session, p)
if err != nil {
return saved, err
}
ev := profileEvent{saved, profileCreate}
if err := es.Publish(ctx, createProfileStream, ev); err != nil {
return saved, err
}
return saved, nil
}
func (es *eventStore) ViewProfile(ctx context.Context, session smqauthn.Session, profileID string) (bootstrap.Profile, error) {
p, err := es.svc.ViewProfile(ctx, session, profileID)
if err != nil {
return p, err
}
ev := profileEvent{p, profileView}
if err := es.Publish(ctx, viewProfileStream, ev); err != nil {
return p, err
}
return p, nil
}
func (es *eventStore) UpdateProfile(ctx context.Context, session smqauthn.Session, p bootstrap.Profile) (bootstrap.Profile, error) {
updated, err := es.svc.UpdateProfile(ctx, session, p)
if err != nil {
return bootstrap.Profile{}, err
}
ev := profileEvent{updated, profileUpdate}
return updated, es.Publish(ctx, updateProfileStream, ev)
}
func (es *eventStore) ListProfiles(ctx context.Context, session smqauthn.Session, offset, limit uint64, name string) (bootstrap.ProfilesPage, error) {
pp, err := es.svc.ListProfiles(ctx, session, offset, limit, name)
if err != nil {
return pp, err
}
ev := profileEvent{operation: profileList}
if err := es.Publish(ctx, listProfilesStream, ev); err != nil {
return pp, err
}
return pp, nil
}
func (es *eventStore) DeleteProfile(ctx context.Context, session smqauthn.Session, profileID string) error {
if err := es.svc.DeleteProfile(ctx, session, profileID); err != nil {
return err
}
ev := deleteProfileEvent{profileID: profileID}
return es.Publish(ctx, deleteProfileStream, ev)
}
func (es *eventStore) AssignProfile(ctx context.Context, session smqauthn.Session, configID, profileID string) error {
if err := es.svc.AssignProfile(ctx, session, configID, profileID); err != nil {
return err
}
ev := assignProfileEvent{configID: configID, profileID: profileID}
return es.Publish(ctx, assignProfileStream, ev)
}
func (es *eventStore) BindResources(ctx context.Context, session smqauthn.Session, token, configID string, bindings []bootstrap.BindingRequest) error {
if err := es.svc.BindResources(ctx, session, token, configID, bindings); err != nil {
return err
}
slots := make([]string, len(bindings))
for i, b := range bindings {
slots[i] = b.Slot
}
ev := bindResourcesEvent{configID: configID, slots: slots}
return es.Publish(ctx, bindResourcesStream, ev)
}
func (es *eventStore) ListBindings(ctx context.Context, session smqauthn.Session, configID string) ([]bootstrap.BindingSnapshot, error) {
bs, err := es.svc.ListBindings(ctx, session, configID)
if err != nil {
return bs, err
}
ev := listBindingsEvent{configID: configID}
if err := es.Publish(ctx, listBindingsStream, ev); err != nil {
return bs, err
}
return bs, nil
}
func (es *eventStore) RefreshBindings(ctx context.Context, session smqauthn.Session, token, configID string) error {
if err := es.svc.RefreshBindings(ctx, session, token, configID); err != nil {
return err
}
ev := refreshBindingsEvent{configID: configID}
return es.Publish(ctx, refreshBindingsStream, ev)
}