mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:30:25 +00:00
b3e2f41194
* WIP: alarms service * fix(alarms): remove rule entity since it is not stored here Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * test(alarms): add tests cases for invalid alarms * feat(alarms): add authorization * feat(alarms): add docker deployment files Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix: update go mod file * feat(alarms): support filtering by resolved_by, updated_by and severity Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * style: fix linter errors Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): provide correct otel naming for create alarm Fixes https://github.com/absmach/magistrala/pull/106#discussion_r2030151971 Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): group routes appropriately Resolves https://github.com/absmach/magistrala/pull/106#discussion_r2030160891 Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): extract alarm id from url path rather than query params Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): add all status to help in decoding Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * style(alarms): maintain consistent import as naming for supermq api package Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * refactor(alarms): update supermq dependecy to the latest Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): Add domains gRPC service config to alarms service Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * test(alarms): all CRUD operations from the service Return empty results instead of nil This standardizes error responses across alarm endpoints to return empty result structs rather than nil. Also renames entityReq to alarmReq and adds HTTP status codes for created/deleted alarms. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * test(alarms): fix failing tests due to introduction of context on sdk Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): remove channel id Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): standardize error handling across CRUD operations Updated error responses to use specific repository errors for consistency Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): add assignment fields to Alarm model and database Introduced AssignedAt and AssignedBy fields to the Alarm struct and updated the database schema accordingly. Enhanced the UpdateAlarm function to handle these new fields, ensuring proper assignment tracking in the alarms system. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): enhance Alarm model with measurement attributes Updated the Alarm struct to include Measurement, Value, Unit, and Cause fields. Modified the validation logic to ensure these fields are present. Adjusted logging and tracing middleware to reflect the new attributes. Updated database schema and related functions to accommodate these changes, ensuring comprehensive alarm data management. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): consume events from pubsub for creation of alarms Removed session dependencies from CreateAlarm method and enhanced alarm validation to ensure all required fields are present Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * style(alarms): add newline at the end of docker compose Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): Add assignee id and metadata fields when consuming messages Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): add acknowledged field Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): Add threshold value for the specific measurement Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): Add channel, thing, and subtopic fields to Alarm model This change adds required fields for tracking alarm sources and reorganizes alarm-related fields for better grouping. Alarms now track the channel, thing, and subtopic that triggered them, along with domain and rule info. Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * test(alarms): add service layer tests Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): consume created at from message rather than creating it Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): ready alarm as a gob encoded object Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): read alarms from alarms queue and remove transformer g Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): update version of supermq Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * feat(alarms): add gob transformer Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): rename thing id to client id Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): create alarms stream Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): check on logic to create new alarm create new alarm if severity, status, subtopic changes enhance logging with additional details for alarms management Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * remove conusmer and use pubsub Signed-off-by: Rodney Osodo <socials@rodneyosodo.com> * fix(alarms): use build tags for rabbitmq and nats * fix(alarms): add health and metrics endpoint * fix(magistrala): use supermq as build flags to see version and commit * fix(alarms): use js config * fix(alarms): remove validation when updating an alarm fix authorization too --------- Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
426 lines
13 KiB
Go
426 lines
13 KiB
Go
// Copyright (c) Abstract Machines
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package provision
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
|
|
"github.com/absmach/magistrala/pkg/sdk"
|
|
"github.com/absmach/supermq/pkg/errors"
|
|
smqSDK "github.com/absmach/supermq/pkg/sdk"
|
|
)
|
|
|
|
const (
|
|
externalIDKey = "external_id"
|
|
gateway = "gateway"
|
|
Active = 1
|
|
|
|
control = "control"
|
|
data = "data"
|
|
export = "export"
|
|
)
|
|
|
|
var (
|
|
ErrUnauthorized = errors.New("unauthorized access")
|
|
ErrFailedToCreateToken = errors.New("failed to create access token")
|
|
ErrEmptyClientsList = errors.New("clients list in configuration empty")
|
|
ErrClientUpdate = errors.New("failed to update client")
|
|
ErrEmptyChannelsList = errors.New("channels list in configuration is empty")
|
|
ErrFailedChannelCreation = errors.New("failed to create channel")
|
|
ErrFailedChannelRetrieval = errors.New("failed to retrieve channel")
|
|
ErrFailedClientCreation = errors.New("failed to create client")
|
|
ErrFailedClientRetrieval = errors.New("failed to retrieve client")
|
|
ErrMissingCredentials = errors.New("missing credentials")
|
|
ErrFailedBootstrapRetrieval = errors.New("failed to retrieve bootstrap")
|
|
ErrFailedCertCreation = errors.New("failed to create certificates")
|
|
ErrFailedCertView = errors.New("failed to view certificate")
|
|
ErrFailedBootstrap = errors.New("failed to create bootstrap config")
|
|
ErrFailedBootstrapValidate = errors.New("failed to validate bootstrap config creation")
|
|
ErrGatewayUpdate = errors.New("failed to updated gateway metadata")
|
|
|
|
limit uint = 10
|
|
offset uint = 0
|
|
)
|
|
|
|
var _ Service = (*provisionService)(nil)
|
|
|
|
// Service specifies Provision service API.
|
|
type Service interface {
|
|
// Provision is the only method this API specifies. Depending on the configuration,
|
|
// the following actions will can be executed:
|
|
// - create a Client based on external_id (eg. MAC address)
|
|
// - create multiple Channels
|
|
// - create Bootstrap configuration
|
|
// - whitelist Client in Bootstrap configuration == connect Client to Channels
|
|
Provision(ctx context.Context, domainID, token, name, externalID, externalKey string) (Result, error)
|
|
|
|
// Mapping returns current configuration used for provision
|
|
// useful for using in ui to create configuration that matches
|
|
// one created with Provision method.
|
|
Mapping(ctx context.Context, token string) (map[string]interface{}, error)
|
|
|
|
// Certs creates certificate for clients that communicate over mTLS
|
|
// A duration string is a possibly signed sequence of decimal numbers,
|
|
// each with optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m".
|
|
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
|
|
Cert(ctx context.Context, domainID, token, clientID, duration string) (string, string, error)
|
|
}
|
|
|
|
type provisionService struct {
|
|
logger *slog.Logger
|
|
sdk sdk.SDK
|
|
conf Config
|
|
}
|
|
|
|
// Result represent what is created with additional info.
|
|
type Result struct {
|
|
Clients []smqSDK.Client `json:"clients,omitempty"`
|
|
Channels []smqSDK.Channel `json:"channels,omitempty"`
|
|
ClientCert map[string]string `json:"client_cert,omitempty"`
|
|
ClientKey map[string]string `json:"client_key,omitempty"`
|
|
CACert string `json:"ca_cert,omitempty"`
|
|
Whitelisted map[string]bool `json:"whitelisted,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// New returns new provision service.
|
|
func New(cfg Config, mgsdk sdk.SDK, logger *slog.Logger) Service {
|
|
return &provisionService{
|
|
logger: logger,
|
|
conf: cfg,
|
|
sdk: mgsdk,
|
|
}
|
|
}
|
|
|
|
// Mapping retrieves current configuration.
|
|
func (ps *provisionService) Mapping(ctx context.Context, token string) (map[string]interface{}, error) {
|
|
pm := smqSDK.PageMetadata{
|
|
Offset: uint64(offset),
|
|
Limit: uint64(limit),
|
|
}
|
|
|
|
if _, err := ps.sdk.Users(ctx, pm, token); err != nil {
|
|
return map[string]interface{}{}, errors.Wrap(ErrUnauthorized, err)
|
|
}
|
|
|
|
return ps.conf.Bootstrap.Content, nil
|
|
}
|
|
|
|
// Provision is provision method for creating setup according to
|
|
// provision layout specified in config.toml.
|
|
func (ps *provisionService) Provision(ctx context.Context, domainID, token, name, externalID, externalKey string) (res Result, err error) {
|
|
var channels []smqSDK.Channel
|
|
var clients []smqSDK.Client
|
|
defer ps.recover(ctx, &err, &clients, &channels, &domainID, &token)
|
|
|
|
token, err = ps.createTokenIfEmpty(ctx, token)
|
|
if err != nil {
|
|
return res, errors.Wrap(ErrFailedToCreateToken, err)
|
|
}
|
|
|
|
if len(ps.conf.Clients) == 0 {
|
|
return res, ErrEmptyClientsList
|
|
}
|
|
if len(ps.conf.Channels) == 0 {
|
|
return res, ErrEmptyChannelsList
|
|
}
|
|
for _, c := range ps.conf.Clients {
|
|
// If client in configs contains metadata with external_id
|
|
// set value for it from the provision request
|
|
if _, ok := c.Metadata[externalIDKey]; ok {
|
|
c.Metadata[externalIDKey] = externalID
|
|
}
|
|
|
|
cli := smqSDK.Client{
|
|
Metadata: c.Metadata,
|
|
}
|
|
if name == "" {
|
|
name = c.Name
|
|
}
|
|
cli.Name = name
|
|
cli, err := ps.sdk.CreateClient(ctx, cli, domainID, token)
|
|
if err != nil {
|
|
res.Error = err.Error()
|
|
return res, errors.Wrap(ErrFailedClientCreation, err)
|
|
}
|
|
|
|
// Get newly created client (in order to get the key).
|
|
cli, err = ps.sdk.Client(ctx, cli.ID, domainID, token)
|
|
if err != nil {
|
|
e := errors.Wrap(err, fmt.Errorf("client id: %s", cli.ID))
|
|
return res, errors.Wrap(ErrFailedClientRetrieval, e)
|
|
}
|
|
clients = append(clients, cli)
|
|
}
|
|
|
|
for _, channel := range ps.conf.Channels {
|
|
ch := smqSDK.Channel{
|
|
Name: name + "_" + channel.Name,
|
|
Metadata: smqSDK.Metadata(channel.Metadata),
|
|
}
|
|
ch, err := ps.sdk.CreateChannel(ctx, ch, domainID, token)
|
|
if err != nil {
|
|
return res, errors.Wrap(ErrFailedChannelCreation, err)
|
|
}
|
|
ch, err = ps.sdk.Channel(ctx, ch.ID, domainID, token)
|
|
if err != nil {
|
|
e := errors.Wrap(err, fmt.Errorf("channel id: %s", ch.ID))
|
|
return res, errors.Wrap(ErrFailedChannelRetrieval, e)
|
|
}
|
|
channels = append(channels, ch)
|
|
}
|
|
|
|
res = Result{
|
|
Clients: clients,
|
|
Channels: channels,
|
|
Whitelisted: map[string]bool{},
|
|
ClientCert: map[string]string{},
|
|
ClientKey: map[string]string{},
|
|
}
|
|
|
|
var cert smqSDK.Cert
|
|
var bsConfig sdk.BootstrapConfig
|
|
for _, c := range clients {
|
|
var chanIDs []string
|
|
|
|
for _, ch := range channels {
|
|
chanIDs = append(chanIDs, ch.ID)
|
|
}
|
|
content, err := json.Marshal(ps.conf.Bootstrap.Content)
|
|
if err != nil {
|
|
return Result{}, errors.Wrap(ErrFailedBootstrap, err)
|
|
}
|
|
|
|
if ps.conf.Bootstrap.Provision && needsBootstrap(c) {
|
|
bsReq := sdk.BootstrapConfig{
|
|
ClientID: c.ID,
|
|
ExternalID: externalID,
|
|
ExternalKey: externalKey,
|
|
Channels: chanIDs,
|
|
CACert: res.CACert,
|
|
ClientCert: cert.Certificate,
|
|
ClientKey: cert.Key,
|
|
Content: string(content),
|
|
}
|
|
bsid, err := ps.sdk.AddBootstrap(ctx, bsReq, domainID, token)
|
|
if err != nil {
|
|
return Result{}, errors.Wrap(ErrFailedBootstrap, err)
|
|
}
|
|
|
|
bsConfig, err = ps.sdk.ViewBootstrap(ctx, bsid, domainID, token)
|
|
if err != nil {
|
|
return Result{}, errors.Wrap(ErrFailedBootstrapValidate, err)
|
|
}
|
|
}
|
|
|
|
if ps.conf.Bootstrap.X509Provision {
|
|
var cert smqSDK.Cert
|
|
|
|
cert, err = ps.sdk.IssueCert(ctx, c.ID, ps.conf.Cert.TTL, domainID, token)
|
|
if err != nil {
|
|
e := errors.Wrap(err, fmt.Errorf("client id: %s", c.ID))
|
|
return res, errors.Wrap(ErrFailedCertCreation, e)
|
|
}
|
|
cert, err := ps.sdk.ViewCert(ctx, cert.SerialNumber, domainID, token)
|
|
if err != nil {
|
|
return res, errors.Wrap(ErrFailedCertView, err)
|
|
}
|
|
|
|
res.ClientCert[c.ID] = cert.Certificate
|
|
res.ClientKey[c.ID] = cert.Key
|
|
res.CACert = ""
|
|
|
|
if needsBootstrap(c) {
|
|
if _, err = ps.sdk.UpdateBootstrapCerts(ctx, bsConfig.ClientID, cert.Certificate, cert.Key, "", domainID, token); err != nil {
|
|
return Result{}, errors.Wrap(ErrFailedCertCreation, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if ps.conf.Bootstrap.AutoWhiteList {
|
|
if err := ps.sdk.Whitelist(ctx, c.ID, Active, domainID, token); err != nil {
|
|
res.Error = err.Error()
|
|
return res, ErrClientUpdate
|
|
}
|
|
res.Whitelisted[c.ID] = true
|
|
}
|
|
}
|
|
|
|
if err = ps.updateGateway(ctx, domainID, token, bsConfig, channels); err != nil {
|
|
return res, err
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (ps *provisionService) Cert(ctx context.Context, domainID, token, clientID, ttl string) (string, string, error) {
|
|
token, err := ps.createTokenIfEmpty(ctx, token)
|
|
if err != nil {
|
|
return "", "", errors.Wrap(ErrFailedToCreateToken, err)
|
|
}
|
|
|
|
th, err := ps.sdk.Client(ctx, clientID, domainID, token)
|
|
if err != nil {
|
|
return "", "", errors.Wrap(ErrUnauthorized, err)
|
|
}
|
|
cert, err := ps.sdk.IssueCert(ctx, th.ID, ps.conf.Cert.TTL, domainID, token)
|
|
if err != nil {
|
|
return "", "", errors.Wrap(ErrFailedCertCreation, err)
|
|
}
|
|
cert, err = ps.sdk.ViewCert(ctx, cert.SerialNumber, domainID, token)
|
|
if err != nil {
|
|
return "", "", errors.Wrap(ErrFailedCertView, err)
|
|
}
|
|
return cert.Certificate, cert.Key, err
|
|
}
|
|
|
|
func (ps *provisionService) createTokenIfEmpty(ctx context.Context, token string) (string, error) {
|
|
if token != "" {
|
|
return token, nil
|
|
}
|
|
|
|
// If no token in request is provided
|
|
// use API key provided in config file or env
|
|
if ps.conf.Server.MgAPIKey != "" {
|
|
return ps.conf.Server.MgAPIKey, nil
|
|
}
|
|
|
|
// If no API key use username and password provided to create access token.
|
|
if ps.conf.Server.MgUsername == "" || ps.conf.Server.MgPass == "" {
|
|
return token, ErrMissingCredentials
|
|
}
|
|
|
|
u := smqSDK.Login{
|
|
Username: ps.conf.Server.MgUsername,
|
|
Password: ps.conf.Server.MgPass,
|
|
}
|
|
tkn, err := ps.sdk.CreateToken(ctx, u)
|
|
if err != nil {
|
|
return token, errors.Wrap(ErrFailedToCreateToken, err)
|
|
}
|
|
|
|
return tkn.AccessToken, nil
|
|
}
|
|
|
|
func (ps *provisionService) updateGateway(ctx context.Context, domainID, token string, bs sdk.BootstrapConfig, channels []smqSDK.Channel) error {
|
|
var gw Gateway
|
|
for _, ch := range channels {
|
|
switch ch.Metadata["type"] {
|
|
case control:
|
|
gw.CtrlChannelID = ch.ID
|
|
case data:
|
|
gw.DataChannelID = ch.ID
|
|
case export:
|
|
gw.ExportChannelID = ch.ID
|
|
}
|
|
}
|
|
gw.ExternalID = bs.ExternalID
|
|
gw.ExternalKey = bs.ExternalKey
|
|
gw.CfgID = bs.ClientID
|
|
gw.Type = gateway
|
|
|
|
c, sdkerr := ps.sdk.Client(ctx, bs.ClientID, domainID, token)
|
|
if sdkerr != nil {
|
|
return errors.Wrap(ErrGatewayUpdate, sdkerr)
|
|
}
|
|
b, err := json.Marshal(gw)
|
|
if err != nil {
|
|
return errors.Wrap(ErrGatewayUpdate, err)
|
|
}
|
|
if err := json.Unmarshal(b, &c.Metadata); err != nil {
|
|
return errors.Wrap(ErrGatewayUpdate, err)
|
|
}
|
|
if _, err := ps.sdk.UpdateClient(ctx, c, domainID, token); err != nil {
|
|
return errors.Wrap(ErrGatewayUpdate, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ps *provisionService) errLog(err error) {
|
|
if err != nil {
|
|
ps.logger.Error(fmt.Sprintf("Error recovering: %s", err))
|
|
}
|
|
}
|
|
|
|
func clean(ctx context.Context, ps *provisionService, clients []smqSDK.Client, channels []smqSDK.Channel, domainID, token string) {
|
|
for _, t := range clients {
|
|
err := ps.sdk.DeleteClient(ctx, t.ID, domainID, token)
|
|
ps.errLog(err)
|
|
}
|
|
for _, c := range channels {
|
|
err := ps.sdk.DeleteChannel(ctx, c.ID, domainID, token)
|
|
ps.errLog(err)
|
|
}
|
|
}
|
|
|
|
func (ps *provisionService) recover(ctx context.Context, e *error, ths *[]smqSDK.Client, chs *[]smqSDK.Channel, dm, tkn *string) {
|
|
if e == nil {
|
|
return
|
|
}
|
|
clients, channels, domainID, token, err := *ths, *chs, *dm, *tkn, *e
|
|
|
|
if errors.Contains(err, ErrFailedClientRetrieval) || errors.Contains(err, ErrFailedChannelCreation) {
|
|
for _, c := range clients {
|
|
err := ps.sdk.DeleteClient(ctx, c.ID, domainID, token)
|
|
ps.errLog(err)
|
|
}
|
|
return
|
|
}
|
|
|
|
if errors.Contains(err, ErrFailedBootstrap) || errors.Contains(err, ErrFailedChannelRetrieval) {
|
|
clean(ctx, ps, clients, channels, domainID, token)
|
|
return
|
|
}
|
|
|
|
if errors.Contains(err, ErrFailedBootstrapValidate) || errors.Contains(err, ErrFailedCertCreation) {
|
|
clean(ctx, ps, clients, channels, domainID, token)
|
|
for _, th := range clients {
|
|
if needsBootstrap(th) {
|
|
ps.errLog(ps.sdk.RemoveBootstrap(ctx, th.ID, domainID, token))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
if errors.Contains(err, ErrFailedBootstrapValidate) || errors.Contains(err, ErrFailedCertCreation) {
|
|
clean(ctx, ps, clients, channels, domainID, token)
|
|
for _, th := range clients {
|
|
if needsBootstrap(th) {
|
|
bs, err := ps.sdk.ViewBootstrap(ctx, th.ID, domainID, token)
|
|
ps.errLog(errors.Wrap(ErrFailedBootstrapRetrieval, err))
|
|
ps.errLog(ps.sdk.RemoveBootstrap(ctx, bs.ClientID, domainID, token))
|
|
}
|
|
}
|
|
}
|
|
|
|
if errors.Contains(err, ErrClientUpdate) || errors.Contains(err, ErrGatewayUpdate) {
|
|
clean(ctx, ps, clients, channels, domainID, token)
|
|
for _, th := range clients {
|
|
if ps.conf.Bootstrap.X509Provision && needsBootstrap(th) {
|
|
_, err := ps.sdk.RevokeCert(ctx, th.ID, domainID, token)
|
|
ps.errLog(err)
|
|
}
|
|
if needsBootstrap(th) {
|
|
bs, err := ps.sdk.ViewBootstrap(ctx, th.ID, domainID, token)
|
|
ps.errLog(errors.Wrap(ErrFailedBootstrapRetrieval, err))
|
|
ps.errLog(ps.sdk.RemoveBootstrap(ctx, bs.ClientID, domainID, token))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
func needsBootstrap(th smqSDK.Client) bool {
|
|
if th.Metadata == nil {
|
|
return false
|
|
}
|
|
|
|
if _, ok := th.Metadata[externalIDKey]; ok {
|
|
return true
|
|
}
|
|
return false
|
|
}
|