MG-2142 - Consume Things connect/disconnect event in Bootstrap (#2192)

Signed-off-by: JeffMboya <jangina.mboya@gmail.com>
This commit is contained in:
JMboya
2024-06-24 12:02:09 +03:00
committed by GitHub
parent 424aa7cf80
commit 94053f37ee
17 changed files with 590 additions and 226 deletions
+18
View File
@@ -257,6 +257,24 @@ func (lm *loggingMiddleware) RemoveChannelHandler(ctx context.Context, id string
return lm.svc.RemoveChannelHandler(ctx, id)
}
func (lm *loggingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", channelID),
slog.String("thing_id", thingID),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Connect thing handler failed to complete successfully", args...)
return
}
lm.logger.Info("Connect thing handler completed successfully", args...)
}(time.Now())
return lm.svc.ConnectThingHandler(ctx, channelID, thingID)
}
func (lm *loggingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
args := []any{
+10
View File
@@ -150,6 +150,16 @@ func (mm *metricsMiddleware) RemoveChannelHandler(ctx context.Context, id string
return mm.svc.RemoveChannelHandler(ctx, id)
}
// ConnectThingHandler instruments ConnectThingHandler method with metrics.
func (mm *metricsMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing_handler").Add(1)
mm.latency.With("method", "connect_thing_handler").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.ConnectThingHandler(ctx, channelID, thingID)
}
// DisconnectThingHandler instruments DisconnectThingHandler method with metrics.
func (mm *metricsMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) (err error) {
defer func(begin time.Time) {
+4 -2
View File
@@ -112,7 +112,9 @@ type ConfigRepository interface {
// RemoveChannel removes channel with the given ID.
RemoveChannel(ctx context.Context, id string) error
// DisconnectHandler changes state of the Config when the corresponding Thing is
// disconnected from the Channel.
// ConnectThing changes state of the Config when the corresponding Thing is connected to the Channel.
ConnectThing(ctx context.Context, channelID, thingID string) error
// DisconnectThing changes state of the Config when the corresponding Thing is disconnected from the Channel.
DisconnectThing(ctx context.Context, channelID, thingID string) error
}
+2 -2
View File
@@ -18,7 +18,7 @@ type updateChannelEvent struct {
}
// Connection event is either connect or disconnect event.
type disconnectEvent struct {
thingID string
type connectionEvent struct {
thingIDs []string
channelID string
}
+67 -6
View File
@@ -9,16 +9,21 @@ import (
"time"
"github.com/absmach/magistrala/bootstrap"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/events"
)
const (
thingRemove = "thing.remove"
thingDisconnect = "policy.delete"
thingConnect = "group.assign"
thingDisconnect = "group.unassign"
channelPrefix = "group."
channelUpdate = channelPrefix + "update"
channelRemove = channelPrefix + "remove"
memberKind = "things"
relation = "group"
)
type eventHandler struct {
@@ -42,9 +47,32 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
case thingRemove:
rte := decodeRemoveThing(msg)
err = es.svc.RemoveConfigHandler(ctx, rte.id)
case thingConnect:
cte := decodeConnectThing(msg)
if cte.channelID == "" || len(cte.thingIDs) == 0 {
return svcerr.ErrMalformedEntity
}
for _, thingID := range cte.thingIDs {
if thingID == "" {
return svcerr.ErrMalformedEntity
}
if err := es.svc.ConnectThingHandler(ctx, cte.channelID, thingID); err != nil {
return err
}
}
case thingDisconnect:
dte := decodeDisconnectThing(msg)
err = es.svc.DisconnectThingHandler(ctx, dte.channelID, dte.thingID)
if dte.channelID == "" || len(dte.thingIDs) == 0 {
return svcerr.ErrMalformedEntity
}
for _, thingID := range dte.thingIDs {
if thingID == "" {
return svcerr.ErrMalformedEntity
}
if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil {
return err
}
}
case channelUpdate:
uce := decodeUpdateChannel(msg)
err = es.handleUpdateChannel(ctx, uce)
@@ -87,10 +115,24 @@ func decodeRemoveChannel(event map[string]interface{}) removeEvent {
}
}
func decodeDisconnectThing(event map[string]interface{}) disconnectEvent {
return disconnectEvent{
channelID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
func decodeConnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
return connectionEvent{}
}
return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
}
}
func decodeDisconnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
return connectionEvent{}
}
return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
}
}
@@ -114,6 +156,25 @@ func read(event map[string]interface{}, key, def string) string {
return val
}
// ReadStringSlice reads string slice from event map.
// If value is not a string slice, returns empty slice.
func ReadStringSlice(event map[string]interface{}, key string) []string {
var res []string
vals, ok := event[key].([]interface{})
if !ok {
return res
}
for _, v := range vals {
if s, ok := v.(string); ok {
res = append(res, s)
}
}
return res
}
func readTime(event map[string]interface{}, key string, def time.Time) time.Time {
val, ok := event[key].(time.Time)
if !ok {
+14
View File
@@ -23,6 +23,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"
channelPrefix = "group."
@@ -276,6 +277,19 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) {
return val, nil
}
type connectThingEvent struct {
thingID string
channelID string
}
func (cte connectThingEvent) Encode() (map[string]interface{}, error) {
return map[string]interface{}{
"thing_id": cte.thingID,
"channel_id": cte.channelID,
"operation": thingConnect,
}, nil
}
type disconnectThingEvent struct {
thingID string
channelID string
+15 -2
View File
@@ -207,14 +207,27 @@ func (es *eventStore) UpdateChannelHandler(ctx context.Context, channel bootstra
return es.Publish(ctx, ev)
}
func (es *eventStore) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.ConnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}
ev := connectThingEvent{
thingID: thingID,
channelID: channelID,
}
return es.Publish(ctx, ev)
}
func (es *eventStore) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := es.svc.DisconnectThingHandler(ctx, channelID, thingID); err != nil {
return err
}
ev := disconnectThingEvent{
thingID,
channelID,
thingID: thingID,
channelID: channelID,
}
return es.Publish(ctx, ev)
+83 -1
View File
@@ -48,6 +48,7 @@ const (
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"
channelPrefix = "group."
@@ -1039,6 +1040,87 @@ func TestRemoveConfigHandler(t *testing.T) {
}
}
func TestConnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
svc, boot, _, _ := newService(t, redisURL)
err = redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
cases := []struct {
desc string
channelID string
thingID string
err error
event map[string]interface{}
}{
{
desc: "connect thing handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
{
desc: "add non-existing channel handler",
channelID: "unknown",
err: nil,
event: nil,
},
{
desc: "add channel handler with empty ID",
channelID: "",
err: nil,
event: nil,
},
{
desc: "add channel handler successfully",
channelID: channel.ID,
thingID: "1",
err: nil,
event: map[string]interface{}{
"channel_id": channel.ID,
"thing_id": "1",
"operation": thingConnect,
"timestamp": time.Now().UnixNano(),
"occurred_at": time.Now().UnixNano(),
},
},
}
lastID := "0"
for _, tc := range cases {
repoCall := boot.On("ConnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
streams := redisClient.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{streamID, lastID},
Count: 1,
Block: time.Second,
}).Val()
var event map[string]interface{}
if len(streams) > 0 && len(streams[0].Messages) > 0 {
msg := streams[0].Messages[0]
event = msg.Values
event["timestamp"] = msg.ID
lastID = msg.ID
}
test(t, tc.event, event, tc.desc)
repoCall.Unset()
}
}
func TestDisconnectThingHandler(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
@@ -1097,7 +1179,7 @@ func TestDisconnectThingHandler(t *testing.T) {
lastID := "0"
for _, tc := range cases {
repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
repoCall := boot.On("DisconnectThing", context.Background(), tc.channelID, tc.thingID).Return(tc.err)
err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
+18
View File
@@ -35,6 +35,24 @@ func (_m *ConfigRepository) ChangeState(ctx context.Context, owner string, id st
return r0
}
// ConnectThing provides a mock function with given fields: ctx, channelID, thingID
func (_m *ConfigRepository) ConnectThing(ctx context.Context, channelID string, thingID string) error {
ret := _m.Called(ctx, channelID, thingID)
if len(ret) == 0 {
panic("no return value specified for ConnectThing")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
r0 = rf(ctx, channelID, thingID)
} else {
r0 = ret.Error(0)
}
return r0
}
// DisconnectThing provides a mock function with given fields: ctx, channelID, thingID
func (_m *ConfigRepository) DisconnectThing(ctx context.Context, channelID string, thingID string) error {
ret := _m.Called(ctx, channelID, thingID)
+22 -4
View File
@@ -91,9 +91,27 @@ func (_m *Service) ChangeState(ctx context.Context, token string, id string, sta
return r0
}
// DisconnectThingHandler provides a mock function with given fields: ctx, channelID, thingID
func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, thingID string) error {
ret := _m.Called(ctx, channelID, thingID)
// ConnectThingHandler provides a mock function with given fields: ctx, channelID, ThingID
func (_m *Service) ConnectThingHandler(ctx context.Context, channelID string, ThingID string) error {
ret := _m.Called(ctx, channelID, ThingID)
if len(ret) == 0 {
panic("no return value specified for ConnectThingHandler")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
r0 = rf(ctx, channelID, ThingID)
} else {
r0 = ret.Error(0)
}
return r0
}
// DisconnectThingHandler provides a mock function with given fields: ctx, channelID, ThingID
func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string, ThingID string) error {
ret := _m.Called(ctx, channelID, ThingID)
if len(ret) == 0 {
panic("no return value specified for DisconnectThingHandler")
@@ -101,7 +119,7 @@ func (_m *Service) DisconnectThingHandler(ctx context.Context, channelID string,
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
r0 = rf(ctx, channelID, thingID)
r0 = rf(ctx, channelID, ThingID)
} else {
r0 = ret.Error(0)
}
+43 -19
View File
@@ -28,6 +28,7 @@ var (
errSaveConnections = errors.New("failed to insert connections to database")
errUpdateChannels = errors.New("failed to update channels in bootstrap configuration database")
errRemoveChannels = errors.New("failed to remove channels from bootstrap configuration in database")
errConnectThing = errors.New("failed to connect thing in bootstrap configuration in database")
errDisconnectThing = errors.New("failed to disconnect thing in bootstrap configuration in database")
)
@@ -47,42 +48,43 @@ func NewConfigRepository(db postgres.Database, log *slog.Logger) bootstrap.Confi
return &configRepository{db: db, log: log}
}
func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (string, error) {
func (cr configRepository) Save(ctx context.Context, cfg bootstrap.Config, chsConnIDs []string) (thingID string, err error) {
q := `INSERT INTO configs (magistrala_thing, owner, name, client_cert, client_key, ca_cert, magistrala_key, external_id, external_key, content, state)
VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)`
VALUES (:magistrala_thing, :owner, :name, :client_cert, :client_key, :ca_cert, :magistrala_key, :external_id, :external_key, :content, :state)`
tx, err := cr.db.BeginTxx(ctx, nil)
if err != nil {
return "", errors.Wrap(repoerr.ErrCreateEntity, err)
}
dbcfg := toDBConfig(cfg)
if _, err := tx.NamedExec(q, dbcfg); err != nil {
e := err
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation {
e = repoerr.ErrConflict
defer func() {
if err != nil {
err = cr.rollback(err, tx)
}
}()
cr.rollback("Failed to insert a Config", tx)
return "", errors.Wrap(repoerr.ErrCreateEntity, e)
if _, err := tx.NamedExec(q, dbcfg); err != nil {
switch pgErr := err.(type) {
case *pgconn.PgError:
if pgErr.Code == pgerrcode.UniqueViolation {
return "", repoerr.ErrConflict
}
}
return "", err
}
if err := insertChannels(ctx, cfg.Owner, cfg.Channels, tx); err != nil {
cr.rollback("Failed to insert Channels", tx)
return "", errors.Wrap(errSaveChannels, err)
}
if err := insertConnections(ctx, cfg, chsConnIDs, tx); err != nil {
cr.rollback("Failed to insert connections", tx)
return "", errors.Wrap(errSaveConnections, err)
}
if err := tx.Commit(); err != nil {
cr.rollback("Failed to commit Config save", tx)
return "", err
}
return cfg.ThingID, nil
}
@@ -313,8 +315,13 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
defer func() {
if err != nil {
err = cr.rollback(err, tx)
}
}()
if err := insertChannels(ctx, owner, channels, tx); err != nil {
cr.rollback("Failed to insert Channels during the update", tx)
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
@@ -324,12 +331,10 @@ func (cr configRepository) UpdateConnections(ctx context.Context, owner, id stri
return repoerr.ErrNotFound
}
}
cr.rollback("Failed to update connections during the update", tx)
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
if err := tx.Commit(); err != nil {
cr.rollback("Failed to commit Config update", tx)
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}
@@ -451,12 +456,29 @@ func (cr configRepository) RemoveChannel(ctx context.Context, id string) error {
return nil
}
func (cr configRepository) ConnectThing(ctx context.Context, channelID, thingID string) error {
q := `UPDATE configs SET state = $1 WHERE EXISTS (
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)`
result, err := cr.db.ExecContext(ctx, q, bootstrap.Active, thingID, channelID)
if err != nil {
return errors.Wrap(errConnectThing, err)
}
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
return nil
}
func (cr configRepository) DisconnectThing(ctx context.Context, channelID, thingID string) error {
q := `UPDATE configs SET state = $1 WHERE EXISTS (
SELECT 1 FROM connections WHERE config_id = $2 AND channel_id = $3)`
if _, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID); err != nil {
result, err := cr.db.ExecContext(ctx, q, bootstrap.Inactive, thingID, channelID)
if err != nil {
return errors.Wrap(errDisconnectThing, err)
}
if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}
return nil
}
@@ -483,10 +505,12 @@ func (cr configRepository) retrieveAll(owner string, filter bootstrap.Filter) (s
return fmt.Sprintf(template, f), params
}
func (cr configRepository) rollback(content string, tx *sqlx.Tx) {
func (cr configRepository) rollback(defErr error, tx *sqlx.Tx) error {
if err := tx.Rollback(); err != nil {
cr.log.Error(fmt.Sprintf("Failed to rollback due to %s", err))
return errors.Wrap(defErr, errors.Wrap(errors.New("failed to rollback"), err))
}
return defErr
}
func insertChannels(_ context.Context, owner string, channels []bootstrap.Channel, tx *sqlx.Tx) error {
+175 -3
View File
@@ -11,6 +11,7 @@ import (
"github.com/absmach/magistrala/bootstrap"
"github.com/absmach/magistrala/bootstrap/postgres"
"github.com/absmach/magistrala/internal/testsutil"
"github.com/absmach/magistrala/pkg/errors"
repoerr "github.com/absmach/magistrala/pkg/errors/repository"
"github.com/gofrs/uuid"
@@ -675,6 +676,107 @@ func TestRemoveChannel(t *testing.T) {
assert.NotContains(t, cfg.Channels, c.Channels[0], fmt.Sprintf("expected to remove channel %s from %s", c.Channels[0], cfg.Channels))
}
func TestConnectThing(t *testing.T) {
repo := postgres.NewConfigRepository(db, testLog)
err := deleteChannels(context.Background(), repo)
require.Nil(t, err, "Channels cleanup expected to succeed.")
c := config
// Use UUID to prevent conflicts.
uid, err := uuid.NewV4()
assert.Nil(t, err, fmt.Sprintf("Got unexpected error: %s.\n", err))
c.ThingKey = uid.String()
c.ThingID = uid.String()
c.ExternalID = uid.String()
c.ExternalKey = uid.String()
c.State = bootstrap.Inactive
saved, err := repo.Save(context.Background(), c, channels)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
wrongID := testsutil.GenerateUUID(&testing.T{})
connectedThing := c
connectedThing.State = bootstrap.Active
randomThing := c
randomThingID, err := uuid.NewV4()
randomThing.ThingID = randomThingID.String()
emptyThing := c
emptyThing.ThingID = ""
emptyThing.ThingKey = ""
emptyThing.ExternalID = ""
emptyThing.ExternalKey = ""
emptyThing.Channels = []bootstrap.Channel{}
cases := []struct {
desc string
owner string
id string
channels []bootstrap.Channel
connections []string
err error
}{
{
desc: "connect disconnected thing",
owner: config.Owner,
id: saved,
channels: c.Channels,
connections: channels,
err: nil,
},
{
desc: "connect already connected thing",
owner: config.Owner,
id: connectedThing.ThingID,
channels: c.Channels,
connections: channels,
err: nil,
},
{
desc: "connect non-existent thing",
owner: config.Owner,
id: wrongID,
channels: c.Channels,
connections: channels,
err: repoerr.ErrNotFound,
},
{
desc: "connect random thing",
owner: config.Owner,
id: randomThing.ThingID,
channels: c.Channels,
connections: channels,
err: repoerr.ErrNotFound,
},
{
desc: "connect empty thing",
owner: config.Owner,
id: emptyThing.ThingID,
channels: c.Channels,
connections: channels,
err: repoerr.ErrNotFound,
},
}
for _, tc := range cases {
for i, ch := range tc.channels {
if i == 0 {
err = repo.ConnectThing(context.Background(), ch.ID, tc.id)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: Expected error: %s, got: %s.\n", tc.desc, tc.err, err))
cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID)
assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err))
assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg))
} else {
_ = repo.ConnectThing(context.Background(), ch.ID, tc.id)
}
}
cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID)
assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err))
assert.Equal(t, cfg.State, bootstrap.Active, fmt.Sprintf("expected to be active when a connection is added from %s", cfg))
}
}
func TestDisconnectThing(t *testing.T) {
repo := postgres.NewConfigRepository(db, testLog)
err := deleteChannels(context.Background(), repo)
@@ -688,15 +790,85 @@ func TestDisconnectThing(t *testing.T) {
c.ThingID = uid.String()
c.ExternalID = uid.String()
c.ExternalKey = uid.String()
c.State = bootstrap.Inactive
saved, err := repo.Save(context.Background(), c, channels)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
err = repo.DisconnectThing(context.Background(), c.Channels[0].ID, saved)
assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err))
wrongID := testsutil.GenerateUUID(&testing.T{})
connectedThing := c
connectedThing.State = bootstrap.Active
randomThing := c
randomThingID, err := uuid.NewV4()
randomThing.ThingID = randomThingID.String()
emptyThing := c
emptyThing.ThingID = ""
emptyThing.ThingKey = ""
emptyThing.ExternalID = ""
emptyThing.ExternalKey = ""
cases := []struct {
desc string
owner string
id string
channels []bootstrap.Channel
connections []string
err error
}{
{
desc: "disconnect connected thing",
owner: config.Owner,
id: connectedThing.ThingID,
channels: c.Channels,
connections: channels,
err: nil,
},
{
desc: "disconnect already disconnected thing",
owner: config.Owner,
id: saved,
channels: c.Channels,
connections: channels,
err: nil,
},
{
desc: "disconnect invalid thing",
owner: config.Owner,
id: wrongID,
channels: c.Channels,
connections: channels,
err: repoerr.ErrNotFound,
},
{
desc: "disconnect random thing",
owner: config.Owner,
id: randomThing.ThingID,
channels: c.Channels,
connections: channels,
err: repoerr.ErrNotFound,
},
{
desc: "disconnect empty thing",
owner: config.Owner,
id: emptyThing.ThingID,
channels: c.Channels,
connections: channels,
err: repoerr.ErrNotFound,
},
}
for _, tc := range cases {
for _, ch := range tc.channels {
err = repo.DisconnectThing(context.Background(), ch.ID, tc.id)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: Expected error: %s, got: %s.\n", tc.desc, tc.err, err))
}
}
cfg, err := repo.RetrieveByID(context.Background(), c.Owner, c.ThingID)
assert.Nil(t, err, fmt.Sprintf("Retrieving config expected to succeed: %s.\n", err))
assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected ti be inactive when a connection is removed from %s", cfg))
assert.Equal(t, cfg.State, bootstrap.Inactive, fmt.Sprintf("expected to be inactive when a connection is removed from %s", cfg))
}
func deleteChannels(ctx context.Context, repo bootstrap.ConfigRepository) error {
+13 -2
View File
@@ -41,6 +41,7 @@ var (
errRemoveConfig = errors.New("failed to remove bootstrap configuration")
errRemoveChannel = errors.New("failed to remove channel")
errCreateThing = errors.New("failed to create thing")
errConnectThing = errors.New("failed to connect thing")
errDisconnectThing = errors.New("failed to disconnect thing")
errCheckChannels = errors.New("failed to check if channels exists")
errConnectionChannels = errors.New("failed to check channels connections")
@@ -96,8 +97,11 @@ type Service interface {
// RemoveChannelHandler removes Channel with id received from an event.
RemoveChannelHandler(ctx context.Context, id string) error
// DisconnectHandler changes state of the Config when connect/disconnect event occurs.
DisconnectThingHandler(ctx context.Context, channelID, thingID string) error
// ConnectThingHandler changes state of the Config to active when connect event occurs.
ConnectThingHandler(ctx context.Context, channelID, ThingID string) error
// DisconnectThingHandler changes state of the Config to inactive when disconnect event occurs.
DisconnectThingHandler(ctx context.Context, channelID, ThingID string) error
}
// ConfigReader is used to parse Config into format which will be encoded
@@ -373,6 +377,13 @@ func (bs bootstrapService) RemoveChannelHandler(ctx context.Context, id string)
return nil
}
func (bs bootstrapService) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := bs.configs.ConnectThing(ctx, channelID, thingID); err != nil {
return errors.Wrap(errConnectThing, err)
}
return nil
}
func (bs bootstrapService) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
if err := bs.configs.DisconnectThing(ctx, channelID, thingID); err != nil {
return errors.Wrap(errDisconnectThing, err)
+75 -177
View File
@@ -144,20 +144,7 @@ func TestAdd(t *testing.T) {
}
func TestView(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
svc, boot, auth, _ := newService()
cases := []struct {
desc string
id string
@@ -166,7 +153,7 @@ func TestView(t *testing.T) {
}{
{
desc: "view an existing config",
id: saved.ThingID,
id: config.ThingID,
token: validToken,
err: nil,
},
@@ -195,27 +182,14 @@ func TestView(t *testing.T) {
}
func TestUpdate(t *testing.T) {
svc, boot, auth, sdk := newService()
svc, boot, auth, _ := newService()
c := config
ch := channel
ch.ID = "2"
c.Channels = append(c.Channels, ch)
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, c)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
modifiedCreated := saved
modifiedCreated := c
modifiedCreated.Content = "new-config"
modifiedCreated.Name = "new name"
@@ -242,7 +216,7 @@ func TestUpdate(t *testing.T) {
},
{
desc: "update a config with wrong credentials",
config: saved,
config: c,
token: invalidToken,
err: svcerr.ErrAuthentication,
},
@@ -259,25 +233,9 @@ func TestUpdate(t *testing.T) {
}
func TestUpdateCert(t *testing.T) {
svc, boot, auth, sdk := newService()
svc, boot, auth, _ := newService()
c := config
ch := channel
ch.ID = "2"
c.Channels = append(c.Channels, ch)
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, c)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
cases := []struct {
desc string
token string
@@ -290,21 +248,21 @@ func TestUpdateCert(t *testing.T) {
}{
{
desc: "update certs for the valid config",
thingID: saved.ThingID,
thingID: c.ThingID,
clientCert: "newCert",
clientKey: "newKey",
caCert: "newCert",
token: validToken,
expectedConfig: bootstrap.Config{
Name: saved.Name,
ThingKey: saved.ThingKey,
Channels: saved.Channels,
ExternalID: saved.ExternalID,
ExternalKey: saved.ExternalKey,
Content: saved.Content,
State: saved.State,
Owner: saved.Owner,
ThingID: saved.ThingID,
Name: c.Name,
ThingKey: c.ThingKey,
Channels: c.Channels,
ExternalID: c.ExternalID,
ExternalKey: c.ExternalKey,
Content: c.Content,
State: c.State,
Owner: c.Owner,
ThingID: c.ThingID,
ClientCert: "newCert",
CACert: "newCert",
ClientKey: "newKey",
@@ -323,7 +281,7 @@ func TestUpdateCert(t *testing.T) {
},
{
desc: "update config cert with wrong credentials",
thingID: saved.ThingID,
thingID: c.ThingID,
clientCert: "newCert",
clientKey: "newKey",
caCert: "newCert",
@@ -584,20 +542,7 @@ func TestList(t *testing.T) {
}
func TestRemove(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
svc, boot, auth, _ := newService()
cases := []struct {
desc string
id string
@@ -606,19 +551,19 @@ func TestRemove(t *testing.T) {
}{
{
desc: "view a config with wrong credentials",
id: saved.ThingID,
id: config.ThingID,
token: invalidToken,
err: svcerr.ErrAuthentication,
},
{
desc: "remove an existing config",
id: saved.ThingID,
id: config.ThingID,
token: validToken,
err: nil,
},
{
desc: "remove removed config",
id: saved.ThingID,
id: config.ThingID,
token: validToken,
err: nil,
},
@@ -641,21 +586,8 @@ func TestRemove(t *testing.T) {
}
func TestBootstrap(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
e, err := enc([]byte(saved.ExternalKey))
svc, boot, _, _ := newService()
e, err := enc([]byte(config.ExternalKey))
assert.Nil(t, err, fmt.Sprintf("Encrypting external key expected to succeed: %s.\n", err))
cases := []struct {
@@ -670,30 +602,30 @@ func TestBootstrap(t *testing.T) {
desc: "bootstrap using invalid external id",
config: bootstrap.Config{},
externalID: "invalid",
externalKey: saved.ExternalKey,
externalKey: config.ExternalKey,
err: svcerr.ErrNotFound,
encrypted: false,
},
{
desc: "bootstrap using invalid external key",
config: bootstrap.Config{},
externalID: saved.ExternalID,
externalID: config.ExternalID,
externalKey: "invalid",
err: bootstrap.ErrExternalKey,
encrypted: false,
},
{
desc: "bootstrap an existing config",
config: saved,
externalID: saved.ExternalID,
externalKey: saved.ExternalKey,
config: config,
externalID: config.ExternalID,
externalKey: config.ExternalKey,
err: nil,
encrypted: false,
},
{
desc: "bootstrap encrypted",
config: saved,
externalID: saved.ExternalID,
config: config,
externalID: config.ExternalID,
externalKey: hex.EncodeToString(e),
err: nil,
encrypted: true,
@@ -711,19 +643,6 @@ func TestBootstrap(t *testing.T) {
func TestChangeState(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(toGroup(config.Channels[0]), nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
cases := []struct {
desc string
state bootstrap.State
@@ -734,7 +653,7 @@ func TestChangeState(t *testing.T) {
{
desc: "change state with wrong credentials",
state: bootstrap.Active,
id: saved.ThingID,
id: config.ThingID,
token: invalidToken,
err: svcerr.ErrAuthentication,
},
@@ -748,21 +667,21 @@ func TestChangeState(t *testing.T) {
{
desc: "change state to Active",
state: bootstrap.Active,
id: saved.ThingID,
id: config.ThingID,
token: validToken,
err: nil,
},
{
desc: "change state to current state",
state: bootstrap.Active,
id: saved.ThingID,
id: config.ThingID,
token: validToken,
err: nil,
},
{
desc: "change state to Inactive",
state: bootstrap.Inactive,
id: saved.ThingID,
id: config.ThingID,
token: validToken,
err: nil,
},
@@ -785,19 +704,7 @@ func TestChangeState(t *testing.T) {
}
func TestUpdateChannelHandler(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
_, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
svc, boot, _, _ := newService()
ch := bootstrap.Channel{
ID: channel.ID,
Name: "new name",
@@ -830,20 +737,7 @@ func TestUpdateChannelHandler(t *testing.T) {
}
func TestRemoveChannelHandler(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
_, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
svc, boot, _, _ := newService()
cases := []struct {
desc string
id string
@@ -851,7 +745,7 @@ func TestRemoveChannelHandler(t *testing.T) {
}{
{
desc: "remove an existing channel",
id: channel.ID,
id: config.Channels[0].ID,
err: nil,
},
{
@@ -869,21 +763,8 @@ func TestRemoveChannelHandler(t *testing.T) {
}
}
func TestRemoveCoinfigHandler(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
func TestRemoveConfigHandler(t *testing.T) {
svc, boot, _, _ := newService()
cases := []struct {
desc string
id string
@@ -891,7 +772,7 @@ func TestRemoveCoinfigHandler(t *testing.T) {
}{
{
desc: "remove an existing config",
id: saved.ThingID,
id: config.ThingID,
err: nil,
},
{
@@ -909,21 +790,38 @@ func TestRemoveCoinfigHandler(t *testing.T) {
}
}
func TestDisconnectThingsHandler(t *testing.T) {
svc, boot, auth, sdk := newService()
repoCall := auth.On("Identify", mock.Anything, &magistrala.IdentityReq{Token: validToken}).Return(&magistrala.IdentityRes{Id: validID}, nil)
repoCall1 := sdk.On("Thing", mock.Anything, mock.Anything).Return(mgsdk.Thing{ID: config.ThingID, Credentials: mgsdk.Credentials{Secret: config.ThingKey}}, nil)
repoCall2 := sdk.On("Channel", mock.Anything, mock.Anything).Return(mgsdk.Channel{}, nil)
repoCall3 := boot.On("ListExisting", context.Background(), mock.Anything, mock.Anything, mock.Anything).Return(config.Channels, nil)
repoCall4 := boot.On("Save", context.Background(), mock.Anything, mock.Anything).Return(mock.Anything, nil)
saved, err := svc.Add(context.Background(), validToken, config)
assert.Nil(t, err, fmt.Sprintf("Saving config expected to succeed: %s.\n", err))
repoCall.Unset()
repoCall1.Unset()
repoCall2.Unset()
repoCall3.Unset()
repoCall4.Unset()
func TestConnectThingsHandler(t *testing.T) {
svc, boot, _, _ := newService()
cases := []struct {
desc string
thingID string
channelID string
err error
}{
{
desc: "connect",
channelID: channel.ID,
thingID: config.ThingID,
err: nil,
},
{
desc: "connect connected",
channelID: channel.ID,
thingID: config.ThingID,
err: svcerr.ErrAddPolicies,
},
}
for _, tc := range cases {
repoCall := boot.On("ConnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
err := svc.ConnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
repoCall.Unset()
}
}
func TestDisconnectThingsHandler(t *testing.T) {
svc, boot, _, _ := newService()
cases := []struct {
desc string
thingID string
@@ -933,22 +831,22 @@ func TestDisconnectThingsHandler(t *testing.T) {
{
desc: "disconnect",
channelID: channel.ID,
thingID: saved.ThingID,
thingID: config.ThingID,
err: nil,
},
{
desc: "disconnect disconnected",
channelID: channel.ID,
thingID: saved.ThingID,
thingID: config.ThingID,
err: nil,
},
}
for _, tc := range cases {
repoCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
svcCall := boot.On("DisconnectThing", context.Background(), mock.Anything, mock.Anything).Return(tc.err)
err := svc.DisconnectThingHandler(context.Background(), tc.channelID, tc.thingID)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
repoCall.Unset()
svcCall.Unset()
}
}
+11
View File
@@ -158,6 +158,17 @@ func (tm *tracingMiddleware) RemoveChannelHandler(ctx context.Context, id string
return tm.svc.RemoveChannelHandler(ctx, id)
}
// ConnectThingHandler traces the "ConnectThingHandler" operation of the wrapped bootstrap.Service.
func (tm *tracingMiddleware) ConnectThingHandler(ctx context.Context, channelID, thingID string) error {
ctx, span := tm.tracer.Start(ctx, "svc_connect_thing_handler", trace.WithAttributes(
attribute.String("channel_id", channelID),
attribute.String("thing_id", thingID),
))
defer span.End()
return tm.svc.ConnectThingHandler(ctx, channelID, thingID)
}
// DisconnectThingHandler traces the "DisconnectThingHandler" operation of the wrapped bootstrap.Service.
func (tm *tracingMiddleware) DisconnectThingHandler(ctx context.Context, channelID, thingID string) error {
ctx, span := tm.tracer.Start(ctx, "svc_disconnect_thing_handler", trace.WithAttributes(
+12 -4
View File
@@ -39,14 +39,18 @@ var (
)
type assignEvent struct {
memberIDs []string
groupID string
memberIDs []string
relation string
memberKind string
groupID string
}
func (cge assignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupAssign,
"member_ids": cge.memberIDs,
"relation": cge.relation,
"memberKind": cge.memberKind,
"group_id": cge.groupID,
}
@@ -54,14 +58,18 @@ func (cge assignEvent) Encode() (map[string]interface{}, error) {
}
type unassignEvent struct {
memberIDs []string
groupID string
memberIDs []string
relation string
memberKind string
groupID string
}
func (cge unassignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupUnassign,
"member_ids": cge.memberIDs,
"relation": cge.relation,
"memberKind": cge.memberKind,
"group_id": cge.groupID,
}
+8 -4
View File
@@ -145,8 +145,10 @@ func (es eventStore) Assign(ctx context.Context, token, groupID, relation, membe
}
event := assignEvent{
groupID: groupID,
memberIDs: memberIDs,
groupID: groupID,
relation: relation,
memberKind: memberKind,
memberIDs: memberIDs,
}
if err := es.Publish(ctx, event); err != nil {
@@ -162,8 +164,10 @@ func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, mem
}
event := unassignEvent{
groupID: groupID,
memberIDs: memberIDs,
groupID: groupID,
relation: relation,
memberKind: memberKind,
memberIDs: memberIDs,
}
if err := es.Publish(ctx, event); err != nil {