mirror of
https://github.com/absmach/magistrala.git
synced 2026-06-23 04:10:28 +00:00
7f03134d8e
Property Based Tests / api-test (push) Has been cancelled
Continuous Delivery / lint-and-build (push) Has been cancelled
Deploy GitHub Pages / swagger-ui (push) Has been cancelled
CI Pipeline / Lint Proto (push) Has been cancelled
CI Pipeline / Detect Changes (push) Has been cancelled
Continuous Delivery / Build and Push Docker Images (push) Has been cancelled
CI Pipeline / lint-and-build (push) Has been cancelled
CI Pipeline / Test ${{ matrix.module }} (push) Has been cancelled
CI Pipeline / Upload Coverage (push) Has been cancelled
Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> Signed-off-by: JeffMboya <jangina.mboya@gmail.com> Co-authored-by: JeffMboya <jangina.mboya@gmail.com>
284 lines
8.0 KiB
Go
284 lines
8.0 KiB
Go
// Copyright (c) Abstract Machines
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package producer
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/absmach/magistrala/bootstrap"
|
|
smqauthn "github.com/absmach/magistrala/pkg/authn"
|
|
"github.com/absmach/magistrala/pkg/events"
|
|
)
|
|
|
|
var _ bootstrap.Service = (*eventStore)(nil)
|
|
|
|
const (
|
|
magistralaPrefix = "magistrala."
|
|
createStream = magistralaPrefix + configCreate
|
|
listStream = magistralaPrefix + configList
|
|
removeStream = magistralaPrefix + configRemove
|
|
updateCertStream = magistralaPrefix + certUpdate
|
|
bootstrapStream = magistralaPrefix + clientBootstrap
|
|
enableConfigStream = magistralaPrefix + configEnable
|
|
disableConfigStream = magistralaPrefix + configDisable
|
|
createProfileStream = magistralaPrefix + profileCreate
|
|
viewProfileStream = magistralaPrefix + profileView
|
|
updateProfileStream = magistralaPrefix + profileUpdate
|
|
listProfilesStream = magistralaPrefix + profileList
|
|
deleteProfileStream = magistralaPrefix + profileDelete
|
|
assignProfileStream = magistralaPrefix + profileAssign
|
|
bindResourcesStream = magistralaPrefix + bindingsBind
|
|
listBindingsStream = magistralaPrefix + bindingsList
|
|
refreshBindingsStream = magistralaPrefix + bindingsRefresh
|
|
)
|
|
|
|
type eventStore struct {
|
|
events.Publisher
|
|
svc bootstrap.Service
|
|
}
|
|
|
|
// NewEventStoreMiddleware returns wrapper around bootstrap service that sends
|
|
// events to event store.
|
|
func NewEventStoreMiddleware(svc bootstrap.Service, publisher events.Publisher) bootstrap.Service {
|
|
return &eventStore{
|
|
svc: svc,
|
|
Publisher: publisher,
|
|
}
|
|
}
|
|
|
|
func (es *eventStore) Add(ctx context.Context, session smqauthn.Session, token string, cfg bootstrap.Config) (bootstrap.Config, error) {
|
|
saved, err := es.svc.Add(ctx, session, token, cfg)
|
|
if err != nil {
|
|
return saved, err
|
|
}
|
|
|
|
ev := configEvent{
|
|
saved, configCreate,
|
|
}
|
|
|
|
if err := es.Publish(ctx, createStream, ev); err != nil {
|
|
return saved, err
|
|
}
|
|
|
|
return saved, err
|
|
}
|
|
|
|
func (es *eventStore) View(ctx context.Context, session smqauthn.Session, id string) (bootstrap.Config, error) {
|
|
cfg, err := es.svc.View(ctx, session, id)
|
|
if err != nil {
|
|
return cfg, err
|
|
}
|
|
ev := configEvent{
|
|
cfg, configView,
|
|
}
|
|
|
|
if err := es.Publish(ctx, magistralaPrefix+configView, ev); err != nil {
|
|
return cfg, err
|
|
}
|
|
|
|
return cfg, err
|
|
}
|
|
|
|
func (es *eventStore) Update(ctx context.Context, session smqauthn.Session, cfg bootstrap.Config) error {
|
|
if err := es.svc.Update(ctx, session, cfg); err != nil {
|
|
return err
|
|
}
|
|
|
|
ev := configEvent{
|
|
cfg, configUpdate,
|
|
}
|
|
|
|
return es.Publish(ctx, magistralaPrefix+configUpdate, ev)
|
|
}
|
|
|
|
func (es eventStore) UpdateCert(ctx context.Context, session smqauthn.Session, id, clientCert, clientKey, caCert string) (bootstrap.Config, error) {
|
|
cfg, err := es.svc.UpdateCert(ctx, session, id, clientCert, clientKey, caCert)
|
|
if err != nil {
|
|
return cfg, err
|
|
}
|
|
|
|
ev := updateCertEvent{
|
|
configID: id,
|
|
clientCert: clientCert,
|
|
clientKey: clientKey,
|
|
caCert: caCert,
|
|
}
|
|
|
|
if err := es.Publish(ctx, updateCertStream, ev); err != nil {
|
|
return cfg, err
|
|
}
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func (es *eventStore) List(ctx context.Context, session smqauthn.Session, filter bootstrap.Filter, offset, limit uint64) (bootstrap.ConfigsPage, error) {
|
|
bp, err := es.svc.List(ctx, session, filter, offset, limit)
|
|
if err != nil {
|
|
return bp, err
|
|
}
|
|
|
|
ev := listConfigsEvent{
|
|
offset: offset,
|
|
limit: limit,
|
|
fullMatch: filter.FullMatch,
|
|
partialMatch: filter.PartialMatch,
|
|
}
|
|
|
|
if err := es.Publish(ctx, listStream, ev); err != nil {
|
|
return bp, err
|
|
}
|
|
|
|
return bp, nil
|
|
}
|
|
|
|
func (es *eventStore) Remove(ctx context.Context, session smqauthn.Session, id string) error {
|
|
if err := es.svc.Remove(ctx, session, id); err != nil {
|
|
return err
|
|
}
|
|
|
|
ev := removeConfigEvent{
|
|
config: id,
|
|
}
|
|
|
|
return es.Publish(ctx, removeStream, ev)
|
|
}
|
|
|
|
func (es *eventStore) Bootstrap(ctx context.Context, externalKey, externalID string, secure bool) (bootstrap.Config, error) {
|
|
cfg, err := es.svc.Bootstrap(ctx, externalKey, externalID, secure)
|
|
|
|
ev := bootstrapEvent{
|
|
cfg,
|
|
externalID,
|
|
true,
|
|
}
|
|
|
|
if err != nil {
|
|
ev.success = false
|
|
}
|
|
|
|
if err := es.Publish(ctx, bootstrapStream, ev); err != nil {
|
|
return cfg, err
|
|
}
|
|
|
|
return cfg, err
|
|
}
|
|
|
|
func (es *eventStore) EnableConfig(ctx context.Context, session smqauthn.Session, id string) (bootstrap.Config, error) {
|
|
cfg, err := es.svc.EnableConfig(ctx, session, id)
|
|
if err != nil {
|
|
return cfg, err
|
|
}
|
|
|
|
ev := enableConfigEvent{configID: id}
|
|
if err := es.Publish(ctx, enableConfigStream, ev); err != nil {
|
|
return cfg, err
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func (es *eventStore) DisableConfig(ctx context.Context, session smqauthn.Session, id string) (bootstrap.Config, error) {
|
|
cfg, err := es.svc.DisableConfig(ctx, session, id)
|
|
if err != nil {
|
|
return cfg, err
|
|
}
|
|
|
|
ev := disableConfigEvent{configID: id}
|
|
if err := es.Publish(ctx, disableConfigStream, ev); err != nil {
|
|
return cfg, err
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func (es *eventStore) CreateProfile(ctx context.Context, session smqauthn.Session, p bootstrap.Profile) (bootstrap.Profile, error) {
|
|
saved, err := es.svc.CreateProfile(ctx, session, p)
|
|
if err != nil {
|
|
return saved, err
|
|
}
|
|
ev := profileEvent{saved, profileCreate}
|
|
if err := es.Publish(ctx, createProfileStream, ev); err != nil {
|
|
return saved, err
|
|
}
|
|
return saved, nil
|
|
}
|
|
|
|
func (es *eventStore) ViewProfile(ctx context.Context, session smqauthn.Session, profileID string) (bootstrap.Profile, error) {
|
|
p, err := es.svc.ViewProfile(ctx, session, profileID)
|
|
if err != nil {
|
|
return p, err
|
|
}
|
|
ev := profileEvent{p, profileView}
|
|
if err := es.Publish(ctx, viewProfileStream, ev); err != nil {
|
|
return p, err
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
func (es *eventStore) UpdateProfile(ctx context.Context, session smqauthn.Session, p bootstrap.Profile) error {
|
|
if err := es.svc.UpdateProfile(ctx, session, p); err != nil {
|
|
return err
|
|
}
|
|
ev := profileEvent{p, profileUpdate}
|
|
return es.Publish(ctx, updateProfileStream, ev)
|
|
}
|
|
|
|
func (es *eventStore) ListProfiles(ctx context.Context, session smqauthn.Session, offset, limit uint64) (bootstrap.ProfilesPage, error) {
|
|
pp, err := es.svc.ListProfiles(ctx, session, offset, limit)
|
|
if err != nil {
|
|
return pp, err
|
|
}
|
|
ev := profileEvent{operation: profileList}
|
|
if err := es.Publish(ctx, listProfilesStream, ev); err != nil {
|
|
return pp, err
|
|
}
|
|
return pp, nil
|
|
}
|
|
|
|
func (es *eventStore) DeleteProfile(ctx context.Context, session smqauthn.Session, profileID string) error {
|
|
if err := es.svc.DeleteProfile(ctx, session, profileID); err != nil {
|
|
return err
|
|
}
|
|
ev := deleteProfileEvent{profileID: profileID}
|
|
return es.Publish(ctx, deleteProfileStream, ev)
|
|
}
|
|
|
|
func (es *eventStore) AssignProfile(ctx context.Context, session smqauthn.Session, configID, profileID string) error {
|
|
if err := es.svc.AssignProfile(ctx, session, configID, profileID); err != nil {
|
|
return err
|
|
}
|
|
ev := assignProfileEvent{configID: configID, profileID: profileID}
|
|
return es.Publish(ctx, assignProfileStream, ev)
|
|
}
|
|
|
|
func (es *eventStore) BindResources(ctx context.Context, session smqauthn.Session, token, configID string, bindings []bootstrap.BindingRequest) error {
|
|
if err := es.svc.BindResources(ctx, session, token, configID, bindings); err != nil {
|
|
return err
|
|
}
|
|
slots := make([]string, len(bindings))
|
|
for i, b := range bindings {
|
|
slots[i] = b.Slot
|
|
}
|
|
ev := bindResourcesEvent{configID: configID, slots: slots}
|
|
return es.Publish(ctx, bindResourcesStream, ev)
|
|
}
|
|
|
|
func (es *eventStore) ListBindings(ctx context.Context, session smqauthn.Session, configID string) ([]bootstrap.BindingSnapshot, error) {
|
|
bs, err := es.svc.ListBindings(ctx, session, configID)
|
|
if err != nil {
|
|
return bs, err
|
|
}
|
|
ev := listBindingsEvent{configID: configID}
|
|
if err := es.Publish(ctx, listBindingsStream, ev); err != nil {
|
|
return bs, err
|
|
}
|
|
return bs, nil
|
|
}
|
|
|
|
func (es *eventStore) RefreshBindings(ctx context.Context, session smqauthn.Session, token, configID string) error {
|
|
if err := es.svc.RefreshBindings(ctx, session, token, configID); err != nil {
|
|
return err
|
|
}
|
|
ev := refreshBindingsEvent{configID: configID}
|
|
return es.Publish(ctx, refreshBindingsStream, ev)
|
|
}
|