Sync with SMQ

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
Dusan Borovcanin
2025-04-11 15:31:55 +02:00
parent ffad1181cf
commit faaf42941d
8 changed files with 78 additions and 69 deletions
+3 -3
View File
@@ -27,18 +27,18 @@ INTERNAL_PROTO_FILES := $(shell find $(INTERNAL_PROTO_DIR) -name "*.proto" | sed
ifneq ($(MG_MESSAGE_BROKER_TYPE),)
MG_MESSAGE_BROKER_TYPE := $(MG_MESSAGE_BROKER_TYPE)
else
MG_MESSAGE_BROKER_TYPE=nats
MG_MESSAGE_BROKER_TYPE=msg_nats
endif
ifneq ($(MG_ES_TYPE),)
MG_ES_TYPE := $(MG_ES_TYPE)
else
MG_ES_TYPE=nats
MG_ES_TYPE=es_nats
endif
define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
go build -tags $(MG_MESSAGE_BROKER_TYPE) --tags $(MG_ES_TYPE) -ldflags "-s -w \
go build -tags $(MG_MESSAGE_BROKER_TYPE) -tags $(MG_ES_TYPE) -ldflags "-s -w \
-X 'github.com/absmach/magistrala.BuildTime=$(TIME)' \
-X 'github.com/absmach/magistrala.Version=$(VERSION)' \
-X 'github.com/absmach/magistrala.Commit=$(COMMIT)'" \
+10 -12
View File
@@ -144,14 +144,13 @@ func (bs bootstrapService) Add(ctx context.Context, session smqauthn.Session, to
if err != nil {
return Config{}, errors.Wrap(errCheckChannels, err)
}
cfg.Channels, err = bs.connectionChannels(toConnect, bs.toIDList(existing), session.DomainID, token)
cfg.Channels, err = bs.connectionChannels(ctx, toConnect, bs.toIDList(existing), session.DomainID, token)
if err != nil {
return Config{}, errors.Wrap(errConnectionChannels, err)
}
id := cfg.ClientID
mgClient, err := bs.client(session.DomainID, id, token)
mgClient, err := bs.client(ctx, session.DomainID, id, token)
if err != nil {
return Config{}, errors.Wrap(errClientNotFound, err)
}
@@ -223,7 +222,7 @@ func (bs bootstrapService) UpdateConnections(ctx context.Context, session smqaut
return errors.Wrap(errUpdateConnections, err)
}
channels, err := bs.connectionChannels(connections, bs.toIDList(existing), session.DomainID, token)
channels, err := bs.connectionChannels(ctx, connections, bs.toIDList(existing), session.DomainID, token)
if err != nil {
return errors.Wrap(errUpdateConnections, err)
}
@@ -336,7 +335,7 @@ func (bs bootstrapService) ChangeState(ctx context.Context, session smqauthn.Ses
switch state {
case Active:
for _, c := range cfg.Channels {
if err := bs.sdk.ConnectClients(context.Background(), c.ID, []string{cfg.ClientID}, []string{"Publish", "Subscribe"}, session.DomainID, token); err != nil {
if err := bs.sdk.ConnectClients(ctx, c.ID, []string{cfg.ClientID}, []string{"Publish", "Subscribe"}, session.DomainID, token); err != nil {
// Ignore conflict errors as they indicate the connection already exists.
if errors.Contains(err, svcerr.ErrConflict) {
continue
@@ -346,7 +345,7 @@ func (bs bootstrapService) ChangeState(ctx context.Context, session smqauthn.Ses
}
case Inactive:
for _, c := range cfg.Channels {
if err := bs.sdk.DisconnectClients(context.Background(), c.ID, []string{cfg.ClientID}, []string{"Publish", "Subscribe"}, session.DomainID, token); err != nil {
if err := bs.sdk.DisconnectClients(ctx, c.ID, []string{cfg.ClientID}, []string{"Publish", "Subscribe"}, session.DomainID, token); err != nil {
if errors.Contains(err, repoerr.ErrNotFound) {
continue
}
@@ -396,29 +395,28 @@ func (bs bootstrapService) DisconnectClientHandler(ctx context.Context, channelI
}
// Method client retrieves SuperMQ Client creating one if an empty ID is passed.
func (bs bootstrapService) client(domainID, id, token string) (mgsdk.Client, error) {
func (bs bootstrapService) client(ctx context.Context, domainID, id, token string) (mgsdk.Client, error) {
// If Client ID is not provided, then create new client.
if id == "" {
id, err := bs.idProvider.ID()
if err != nil {
return mgsdk.Client{}, errors.Wrap(errCreateClient, err)
}
client, sdkErr := bs.sdk.CreateClient(context.Background(), mgsdk.Client{ID: id, Name: "Bootstrapped Client " + id}, domainID, token)
client, sdkErr := bs.sdk.CreateClient(ctx, mgsdk.Client{ID: id, Name: "Bootstrapped Client " + id}, domainID, token)
if sdkErr != nil {
return mgsdk.Client{}, errors.Wrap(errCreateClient, sdkErr)
}
return client, nil
}
// If Client ID is provided, then retrieve client
client, sdkErr := bs.sdk.Client(context.Background(), id, domainID, token)
client, sdkErr := bs.sdk.Client(ctx, id, domainID, token)
if sdkErr != nil {
return mgsdk.Client{}, errors.Wrap(ErrClients, sdkErr)
}
return client, nil
}
func (bs bootstrapService) connectionChannels(channels, existing []string, domainID, token string) ([]Channel, error) {
func (bs bootstrapService) connectionChannels(ctx context.Context, channels, existing []string, domainID, token string) ([]Channel, error) {
add := make(map[string]bool, len(channels))
for _, ch := range channels {
add[ch] = true
@@ -432,7 +430,7 @@ func (bs bootstrapService) connectionChannels(channels, existing []string, domai
var ret []Channel
for id := range add {
ch, err := bs.sdk.Channel(context.Background(), id, domainID, token)
ch, err := bs.sdk.Channel(ctx, id, domainID, token)
if err != nil {
return nil, errors.Wrap(errors.ErrMalformedEntity, err)
}
+1 -1
View File
@@ -35,7 +35,7 @@ SMQ_RABBITMQ_MQTT_QOS=0
SMQ_RABBITMQ_WS_TARGET_PATH=/ws
## Message Broker
SMQ_MESSAGE_BROKER_TYPE=nats
SMQ_MESSAGE_BROKER_TYPE=msg_nats
SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL}
## MQTT Broker
+5 -5
View File
@@ -49,24 +49,24 @@ Therefore, the following combinations are possible:
For Message brokers other than NATS, you would need to build the docker images with RabbitMQ as the build tag and change the `docker/.env`. For example, to use RabbitMQ as a message broker:
```bash
SMQ_MESSAGE_BROKER_TYPE=rabbitmq make dockers
SMQ_MESSAGE_BROKER_TYPE=msg_rabbitmq make dockers
```
```env
SMQ_MESSAGE_BROKER_TYPE=rabbitmq
SMQ_MESSAGE_BROKER_TYPE=msg_rabbitmq
SMQ_MESSAGE_BROKER_URL=${SMQ_RABBITMQ_URL}
```
For Redis as an events store, you would need to run RabbitMQ or NATS as a message broker. For example, to use Redis as an events store with rabbitmq as a message broker:
```bash
SMQ_ES_TYPE=redis SMQ_MESSAGE_BROKER_TYPE=rabbitmq make dockers
SMQ_ES_TYPE=es_redis SMQ_MESSAGE_BROKER_TYPE=msg_rabbitmq make dockers
```
```env
SMQ_MESSAGE_BROKER_TYPE=rabbitmq
SMQ_MESSAGE_BROKER_TYPE=msg_rabbitmq
SMQ_MESSAGE_BROKER_URL=${SMQ_RABBITMQ_URL}
SMQ_ES_TYPE=redis
SMQ_ES_TYPE=es_redis
SMQ_ES_URL=${SMQ_REDIS_URL}
```
+1 -1
View File
@@ -5,7 +5,7 @@ go 1.24.2
require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/supermq v0.16.1-0.20250411103150-01f20a29d260
github.com/absmach/supermq v0.16.1-0.20250411132829-0e571d1905af
github.com/authzed/authzed-go v1.3.1-0.20250320210445-0cde0d8c71e2
github.com/authzed/grpcutil v0.0.0-20250221190651-1985b19b35b8
github.com/caarlos0/env/v11 v11.3.1
+2 -2
View File
@@ -28,8 +28,8 @@ github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.7 h1:XLvpw0qxbP2QhOz7KLM2ZRar+vSCpSG/0o0kEvWx3No=
github.com/absmach/senml v1.0.7/go.mod h1:3bRIiNc8hq7l3auMs8gQrpsM5hHy7iDuiLILrf/+MfA=
github.com/absmach/supermq v0.16.1-0.20250411103150-01f20a29d260 h1:iJIU3Do+lklQM9OHYQVlf36I5jHHsqWTbdJRR20xB1E=
github.com/absmach/supermq v0.16.1-0.20250411103150-01f20a29d260/go.mod h1:dJqO3luvt+zLVuDhxOdsRRCuv945F86Nf1BXxFro+nU=
github.com/absmach/supermq v0.16.1-0.20250411132829-0e571d1905af h1:FKVu3CX1V+8mob0hvfD0havIWXp9Wcv8nHJOeNhWmUE=
github.com/absmach/supermq v0.16.1-0.20250411132829-0e571d1905af/go.mod h1:dJqO3luvt+zLVuDhxOdsRRCuv945F86Nf1BXxFro+nU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+46 -37
View File
@@ -4,6 +4,7 @@
package e2e
import (
"context"
"fmt"
"math/rand"
"net/http"
@@ -148,8 +149,8 @@ func createUser(s sdk.SDK, conf Config) (string, string, error) {
Status: sdk.EnabledStatus,
Role: "admin",
}
if _, err := s.CreateUser(user, ""); err != nil {
ctx := context.Background()
if _, err := s.CreateUser(context.Background(), user, ""); err != nil {
return "", "", fmt.Errorf("unable to create user: %w", err)
}
@@ -157,7 +158,7 @@ func createUser(s sdk.SDK, conf Config) (string, string, error) {
Username: user.Credentials.Username,
Password: user.Credentials.Secret,
}
token, err := s.CreateToken(login)
token, err := s.CreateToken(ctx, login)
if err != nil {
return "", "", fmt.Errorf("unable to login user: %w", err)
}
@@ -165,11 +166,11 @@ func createUser(s sdk.SDK, conf Config) (string, string, error) {
dname := fmt.Sprintf("%s%s", conf.Prefix, namesgenerator.Generate())
domain := sdk.Domain{
Name: dname,
Alias: strings.ToLower(dname),
Route: strings.ToLower(dname),
Permission: "admin",
}
domain, err = s.CreateDomain(domain, token.AccessToken)
domain, err = s.CreateDomain(ctx, domain, token.AccessToken)
if err != nil {
return "", "", fmt.Errorf("unable to create domain: %w", err)
}
@@ -178,7 +179,7 @@ func createUser(s sdk.SDK, conf Config) (string, string, error) {
Username: user.Credentials.Username,
Password: user.Credentials.Secret,
}
token, err = s.CreateToken(login)
token, err = s.CreateToken(ctx, login)
if err != nil {
return "", "", fmt.Errorf("unable to login user: %w", err)
}
@@ -189,6 +190,7 @@ func createUser(s sdk.SDK, conf Config) (string, string, error) {
func createUsers(s sdk.SDK, conf Config, token string) ([]sdk.User, error) {
var err error
users := []sdk.User{}
ctx := context.Background()
for i := uint64(0); i < conf.Num; i++ {
user := sdk.User{
@@ -202,7 +204,7 @@ func createUsers(s sdk.SDK, conf Config, token string) ([]sdk.User, error) {
Status: sdk.EnabledStatus,
}
user, err = s.CreateUser(user, token)
user, err = s.CreateUser(ctx, user, token)
if err != nil {
return []sdk.User{}, fmt.Errorf("failed to create the users: %w", err)
}
@@ -215,6 +217,7 @@ func createUsers(s sdk.SDK, conf Config, token string) ([]sdk.User, error) {
func createGroups(s sdk.SDK, conf Config, domainID, token string) ([]sdk.Group, error) {
var err error
groups := []sdk.Group{}
ctx := context.Background()
for i := uint64(0); i < conf.Num; i++ {
group := sdk.Group{
@@ -222,7 +225,7 @@ func createGroups(s sdk.SDK, conf Config, domainID, token string) ([]sdk.Group,
Status: sdk.EnabledStatus,
}
group, err = s.CreateGroup(group, domainID, token)
group, err = s.CreateGroup(ctx, group, domainID, token)
if err != nil {
return []sdk.Group{}, fmt.Errorf("failed to create the group: %w", err)
}
@@ -235,6 +238,7 @@ func createGroups(s sdk.SDK, conf Config, domainID, token string) ([]sdk.Group,
func createClientsInBatch(s sdk.SDK, conf Config, domainID, token string, num uint64) ([]sdk.Client, error) {
var err error
clients := make([]sdk.Client, num)
ctx := context.Background()
for i := uint64(0); i < num; i++ {
clients[i] = sdk.Client{
@@ -242,7 +246,7 @@ func createClientsInBatch(s sdk.SDK, conf Config, domainID, token string, num ui
}
}
clients, err = s.CreateClients(clients, domainID, token)
clients, err = s.CreateClients(ctx, clients, domainID, token)
if err != nil {
return []sdk.Client{}, fmt.Errorf("failed to create the clients: %w", err)
}
@@ -281,12 +285,13 @@ func createClients(s sdk.SDK, conf Config, domainID, token string) ([]sdk.Client
func createChannelsInBatch(s sdk.SDK, conf Config, domainID, token string, num uint64) ([]sdk.Channel, error) {
var err error
channels := make([]sdk.Channel, num)
ctx := context.Background()
for i := uint64(0); i < num; i++ {
channels[i] = sdk.Channel{
Name: fmt.Sprintf("%s%s", conf.Prefix, namesgenerator.Generate()),
}
channels[i], err = s.CreateChannel(channels[i], domainID, token)
channels[i], err = s.CreateChannel(ctx, channels[i], domainID, token)
if err != nil {
return []sdk.Channel{}, fmt.Errorf("failed to create the channels: %w", err)
}
@@ -324,12 +329,13 @@ func createChannels(s sdk.SDK, conf Config, domainID, token string) ([]sdk.Chann
}
func read(s sdk.SDK, conf Config, domainID, token string, users []sdk.User, groups []sdk.Group, clients []sdk.Client, channels []sdk.Channel) error {
ctx := context.Background()
for _, user := range users {
if _, err := s.User(user.ID, token); err != nil {
if _, err := s.User(ctx, user.ID, token); err != nil {
return fmt.Errorf("failed to get user %w", err)
}
}
up, err := s.Users(sdk.PageMetadata{}, token)
up, err := s.Users(ctx, sdk.PageMetadata{}, token)
if err != nil {
return fmt.Errorf("failed to get users %w", err)
}
@@ -337,11 +343,11 @@ func read(s sdk.SDK, conf Config, domainID, token string, users []sdk.User, grou
return fmt.Errorf("returned users %d less than created users %d", up.Total, conf.Num)
}
for _, group := range groups {
if _, err := s.Group(group.ID, domainID, token); err != nil {
if _, err := s.Group(ctx, group.ID, domainID, token); err != nil {
return fmt.Errorf("failed to get group %w", err)
}
}
gp, err := s.Groups(sdk.PageMetadata{}, domainID, token)
gp, err := s.Groups(ctx, sdk.PageMetadata{}, domainID, token)
if err != nil {
return fmt.Errorf("failed to get groups %w", err)
}
@@ -349,11 +355,11 @@ func read(s sdk.SDK, conf Config, domainID, token string, users []sdk.User, grou
return fmt.Errorf("returned groups %d less than created groups %d", gp.Total, conf.Num)
}
for _, c := range clients {
if _, err := s.Client(c.ID, domainID, token); err != nil {
if _, err := s.Client(ctx, c.ID, domainID, token); err != nil {
return fmt.Errorf("failed to get client %w", err)
}
}
tp, err := s.Clients(sdk.PageMetadata{}, domainID, token)
tp, err := s.Clients(ctx, sdk.PageMetadata{}, domainID, token)
if err != nil {
return fmt.Errorf("failed to get clients %w", err)
}
@@ -361,11 +367,11 @@ func read(s sdk.SDK, conf Config, domainID, token string, users []sdk.User, grou
return fmt.Errorf("returned clients %d less than created clients %d", tp.Total, conf.Num)
}
for _, channel := range channels {
if _, err := s.Channel(channel.ID, domainID, token); err != nil {
if _, err := s.Channel(ctx, channel.ID, domainID, token); err != nil {
return fmt.Errorf("failed to get channel %w", err)
}
}
cp, err := s.Channels(sdk.PageMetadata{}, domainID, token)
cp, err := s.Channels(ctx, sdk.PageMetadata{}, domainID, token)
if err != nil {
return fmt.Errorf("failed to get channels %w", err)
}
@@ -377,10 +383,11 @@ func read(s sdk.SDK, conf Config, domainID, token string, users []sdk.User, grou
}
func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Group, clients []sdk.Client, channels []sdk.Channel) error {
ctx := context.Background()
for _, user := range users {
user.FirstName = namesgenerator.Generate()
user.Metadata = sdk.Metadata{"Update": namesgenerator.Generate()}
rUser, err := s.UpdateUser(user, token)
rUser, err := s.UpdateUser(ctx, user, token)
if err != nil {
return fmt.Errorf("failed to update user %w", err)
}
@@ -392,7 +399,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
}
user = rUser
user.Credentials.Username = namesgenerator.Generate()
rUser, err = s.UpdateUsername(user, token)
rUser, err = s.UpdateUsername(ctx, user, token)
if err != nil {
return fmt.Errorf("failed to update username %w", err)
}
@@ -400,7 +407,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to update user name before %s after %s", user.Credentials.Username, rUser.Credentials.Username)
}
user = rUser
rUser, err = s.UpdateUserEmail(user, token)
rUser, err = s.UpdateUserEmail(ctx, user, token)
if err != nil {
return fmt.Errorf("failed to update user identity %w", err)
}
@@ -409,7 +416,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
}
user = rUser
user.Tags = []string{namesgenerator.Generate()}
rUser, err = s.UpdateUserTags(user, token)
rUser, err = s.UpdateUserTags(ctx, user, token)
if err != nil {
return fmt.Errorf("failed to update user tags %w", err)
}
@@ -417,7 +424,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to update user tags before %s after %s", user.Tags[0], rUser.Tags[0])
}
user = rUser
rUser, err = s.DisableUser(user.ID, token)
rUser, err = s.DisableUser(ctx, user.ID, token)
if err != nil {
return fmt.Errorf("failed to disable user %w", err)
}
@@ -425,7 +432,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to disable user before %s after %s", user.Status, rUser.Status)
}
user = rUser
rUser, err = s.EnableUser(user.ID, token)
rUser, err = s.EnableUser(ctx, user.ID, token)
if err != nil {
return fmt.Errorf("failed to enable user %w", err)
}
@@ -436,7 +443,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
for _, group := range groups {
group.Name = namesgenerator.Generate()
group.Metadata = sdk.Metadata{"Update": namesgenerator.Generate()}
rGroup, err := s.UpdateGroup(group, domainID, token)
rGroup, err := s.UpdateGroup(ctx, group, domainID, token)
if err != nil {
return fmt.Errorf("failed to update group %w", err)
}
@@ -447,7 +454,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to update group metadata before %s after %s", group.Metadata["Update"], rGroup.Metadata["Update"])
}
group = rGroup
rGroup, err = s.DisableGroup(group.ID, domainID, token)
rGroup, err = s.DisableGroup(ctx, group.ID, domainID, token)
if err != nil {
return fmt.Errorf("failed to disable group %w", err)
}
@@ -455,7 +462,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to disable group before %s after %s", group.Status, rGroup.Status)
}
group = rGroup
rGroup, err = s.EnableGroup(group.ID, domainID, token)
rGroup, err = s.EnableGroup(ctx, group.ID, domainID, token)
if err != nil {
return fmt.Errorf("failed to enable group %w", err)
}
@@ -466,7 +473,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
for _, t := range clients {
t.Name = namesgenerator.Generate()
t.Metadata = sdk.Metadata{"Update": namesgenerator.Generate()}
rClient, err := s.UpdateClient(t, domainID, token)
rClient, err := s.UpdateClient(ctx, t, domainID, token)
if err != nil {
return fmt.Errorf("failed to update client %w", err)
}
@@ -477,13 +484,13 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to update client metadata before %s after %s", t.Metadata["Update"], rClient.Metadata["Update"])
}
t = rClient
rClient, err = s.UpdateClientSecret(t.ID, t.Credentials.Secret, domainID, token)
rClient, err = s.UpdateClientSecret(ctx, t.ID, t.Credentials.Secret, domainID, token)
if err != nil {
return fmt.Errorf("failed to update client secret %w", err)
}
t = rClient
t.Tags = []string{namesgenerator.Generate()}
rClient, err = s.UpdateClientTags(t, domainID, token)
rClient, err = s.UpdateClientTags(ctx, t, domainID, token)
if err != nil {
return fmt.Errorf("failed to update client tags %w", err)
}
@@ -491,7 +498,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to update client tags before %s after %s", t.Tags[0], rClient.Tags[0])
}
t = rClient
rClient, err = s.DisableClient(t.ID, domainID, token)
rClient, err = s.DisableClient(ctx, t.ID, domainID, token)
if err != nil {
return fmt.Errorf("failed to disable client %w", err)
}
@@ -499,7 +506,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to disable client before %s after %s", t.Status, rClient.Status)
}
t = rClient
rClient, err = s.EnableClient(t.ID, domainID, token)
rClient, err = s.EnableClient(ctx, t.ID, domainID, token)
if err != nil {
return fmt.Errorf("failed to enable client %w", err)
}
@@ -510,7 +517,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
for _, channel := range channels {
channel.Name = namesgenerator.Generate()
channel.Metadata = sdk.Metadata{"Update": namesgenerator.Generate()}
rChannel, err := s.UpdateChannel(channel, domainID, token)
rChannel, err := s.UpdateChannel(ctx, channel, domainID, token)
if err != nil {
return fmt.Errorf("failed to update channel %w", err)
}
@@ -521,7 +528,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to update channel metadata before %s after %s", channel.Metadata["Update"], rChannel.Metadata["Update"])
}
channel = rChannel
rChannel, err = s.DisableChannel(channel.ID, domainID, token)
rChannel, err = s.DisableChannel(ctx, channel.ID, domainID, token)
if err != nil {
return fmt.Errorf("failed to disable channel %w", err)
}
@@ -529,7 +536,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
return fmt.Errorf("failed to disable channel before %s after %s", channel.Status, rChannel.Status)
}
channel = rChannel
rChannel, err = s.EnableChannel(channel.ID, domainID, token)
rChannel, err = s.EnableChannel(ctx, channel.ID, domainID, token)
if err != nil {
return fmt.Errorf("failed to enable channel %w", err)
}
@@ -539,6 +546,7 @@ func update(s sdk.SDK, domainID, token string, users []sdk.User, groups []sdk.Gr
}
func messaging(s sdk.SDK, conf Config, domainID, token string, clients []sdk.Client, channels []sdk.Channel) error {
ctx := context.Background()
for _, c := range clients {
for _, channel := range channels {
conn := sdk.Connection{
@@ -546,7 +554,7 @@ func messaging(s sdk.SDK, conf Config, domainID, token string, clients []sdk.Cli
ChannelIDs: []string{channel.ID},
Types: []string{"publish", "subscribe"},
}
if err := s.Connect(conn, domainID, token); err != nil {
if err := s.Connect(ctx, conn, domainID, token); err != nil {
return fmt.Errorf("failed to connect client %s to channel %s", c.ID, channel.ID)
}
}
@@ -585,7 +593,8 @@ func messaging(s sdk.SDK, conf Config, domainID, token string, clients []sdk.Cli
}
func sendHTTPMessage(s sdk.SDK, msg string, client sdk.Client, chanID string) error {
if err := s.SendMessage(chanID, msg, client.Credentials.Secret); err != nil {
ctx := context.Background()
if err := s.SendMessage(ctx, client.DomainID, chanID, msg, client.Credentials.Secret); err != nil {
return fmt.Errorf("HTTP failed to send message from client %s to channel %s: %w", client.ID, chanID, err)
}
+10 -8
View File
@@ -6,6 +6,7 @@ package provision
import (
"bufio"
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
"crypto/rsa"
@@ -56,6 +57,7 @@ type Config struct {
// Provision - function that does actual provisiong.
func Provision(conf Config) error {
ctx := context.Background()
const (
rsaBits = 4096
ttl = "2400h"
@@ -89,14 +91,14 @@ func Provision(conf Config) error {
}
// Create new user
if _, err := s.CreateUser(user, ""); err != nil {
if _, err := s.CreateUser(ctx, user, ""); err != nil {
return fmt.Errorf("unable to create new user: %s", err.Error())
}
var err error
// Login user
token, err := s.CreateToken(supermqSDK.Login{Username: user.Credentials.Username, Password: user.Credentials.Secret})
token, err := s.CreateToken(ctx, supermqSDK.Login{Username: user.Credentials.Username, Password: user.Credentials.Secret})
if err != nil {
return fmt.Errorf("unable to login user: %s", err.Error())
}
@@ -105,16 +107,16 @@ func Provision(conf Config) error {
dname := fmt.Sprintf("%s%s", conf.Prefix, namesgenerator.Generate())
domain := supermqSDK.Domain{
Name: dname,
Alias: strings.ToLower(dname),
Route: strings.ToLower(dname),
Permission: "admin",
}
domain, err = s.CreateDomain(domain, token.AccessToken)
domain, err = s.CreateDomain(ctx, domain, token.AccessToken)
if err != nil {
return fmt.Errorf("unable to create domain: %w", err)
}
// Login to domain
token, err = s.CreateToken(supermqSDK.Login{
token, err = s.CreateToken(ctx, supermqSDK.Login{
Username: user.Credentials.Username,
Password: user.Credentials.Secret,
})
@@ -160,14 +162,14 @@ func Provision(conf Config) error {
channels[i] = supermqSDK.Channel{Name: fmt.Sprintf("%s-channel-%d", conf.Prefix, i)}
}
clients, err = s.CreateClients(clients, domain.ID, token.AccessToken)
clients, err = s.CreateClients(ctx, clients, domain.ID, token.AccessToken)
if err != nil {
return fmt.Errorf("failed to create the clients: %s", err.Error())
}
var chs []supermqSDK.Channel
for _, c := range channels {
c, err = s.CreateChannel(c, domain.ID, token.AccessToken)
c, err = s.CreateChannel(ctx, c, domain.ID, token.AccessToken)
if err != nil {
return fmt.Errorf("failed to create the chennels: %s", err.Error())
}
@@ -263,7 +265,7 @@ func Provision(conf Config) error {
ChannelIDs: []string{cID},
Types: []string{"publish", "subscribe"},
}
if err := s.Connect(conIDs, domain.ID, token.AccessToken); err != nil {
if err := s.Connect(ctx, conIDs, domain.ID, token.AccessToken); err != nil {
log.Fatalf("Failed to connect clients %s to channels %s: %s", tID, cID, err)
}
}