MG-1965 - Process Event Logs (#2057)

Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>
Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
This commit is contained in:
b1ackd0t
2024-06-27 17:38:20 +03:00
committed by GitHub
parent b0e37dacf4
commit 0794363a3c
66 changed files with 3968 additions and 447 deletions
+18 -55
View File
@@ -5,7 +5,6 @@ package consumer
import (
"context"
"encoding/json"
"time"
"github.com/absmach/magistrala/bootstrap"
@@ -69,6 +68,9 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
if thingID == "" {
return svcerr.ErrMalformedEntity
}
}
for _, thingID := range dte.thingIDs {
if err = es.svc.DisconnectThingHandler(ctx, dte.channelID, thingID); err != nil {
return err
}
@@ -89,50 +91,47 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
func decodeRemoveThing(event map[string]interface{}) removeEvent {
return removeEvent{
id: read(event, "id", ""),
id: events.Read(event, "id", ""),
}
}
func decodeUpdateChannel(event map[string]interface{}) updateChannelEvent {
strmeta := read(event, "metadata", "{}")
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil {
metadata = map[string]interface{}{}
}
metadata := events.Read(event, "metadata", map[string]interface{}{})
return updateChannelEvent{
id: read(event, "id", ""),
name: read(event, "name", ""),
id: events.Read(event, "id", ""),
name: events.Read(event, "name", ""),
metadata: metadata,
updatedAt: readTime(event, "updated_at", time.Now()),
updatedBy: read(event, "updated_by", ""),
updatedAt: events.Read(event, "updated_at", time.Now()),
updatedBy: events.Read(event, "updated_by", ""),
}
}
func decodeRemoveChannel(event map[string]interface{}) removeEvent {
return removeEvent{
id: read(event, "id", ""),
id: events.Read(event, "id", ""),
}
}
func decodeConnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
if events.Read(event, "memberKind", "") != memberKind && events.Read(event, "relation", "") != relation {
return connectionEvent{}
}
return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
channelID: events.Read(event, "group_id", ""),
thingIDs: events.ReadStringSlice(event, "member_ids"),
}
}
func decodeDisconnectThing(event map[string]interface{}) connectionEvent {
if read(event, "memberKind", "") != memberKind && read(event, "relation", "") != relation {
if events.Read(event, "memberKind", "") != memberKind && events.Read(event, "relation", "") != relation {
return connectionEvent{}
}
return connectionEvent{
channelID: read(event, "group_id", ""),
thingIDs: ReadStringSlice(event, "member_ids"),
channelID: events.Read(event, "group_id", ""),
thingIDs: events.ReadStringSlice(event, "member_ids"),
}
}
@@ -144,42 +143,6 @@ func (es *eventHandler) handleUpdateChannel(ctx context.Context, uce updateChann
UpdatedAt: uce.updatedAt,
UpdatedBy: uce.updatedBy,
}
return es.svc.UpdateChannelHandler(ctx, channel)
}
func read(event map[string]interface{}, key, def string) string {
val, ok := event[key].(string)
if !ok {
return def
}
return val
}
// ReadStringSlice reads string slice from event map.
// If value is not a string slice, returns empty slice.
func ReadStringSlice(event map[string]interface{}, key string) []string {
var res []string
vals, ok := event[key].([]interface{})
if !ok {
return res
}
for _, v := range vals {
if s, ok := v.(string); ok {
res = append(res, s)
}
}
return res
}
func readTime(event map[string]interface{}, key string, def time.Time) time.Time {
val, ok := event[key].(time.Time)
if !ok {
return def
}
return val
}
+10 -40
View File
@@ -4,14 +4,12 @@
package producer
import (
"encoding/json"
"github.com/absmach/magistrala/bootstrap"
"github.com/absmach/magistrala/pkg/events"
)
const (
configPrefix = "config."
configPrefix = "bootstrap.config."
configCreate = configPrefix + "create"
configUpdate = configPrefix + "update"
configRemove = configPrefix + "remove"
@@ -19,18 +17,18 @@ const (
configList = configPrefix + "list"
configHandlerRemove = configPrefix + "remove_handler"
thingPrefix = "thing."
thingPrefix = "bootstrap.thing."
thingBootstrap = thingPrefix + "bootstrap"
thingStateChange = thingPrefix + "change_state"
thingUpdateConnections = thingPrefix + "update_connections"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"
channelPrefix = "group."
channelPrefix = "bootstrap.channel."
channelHandlerRemove = channelPrefix + "remove_handler"
channelUpdateHandler = channelPrefix + "update_handler"
certUpdate = "cert.update"
certUpdate = "bootstrap.cert.update"
)
var (
@@ -74,11 +72,7 @@ func (ce configEvent) Encode() (map[string]interface{}, error) {
for i, ch := range ce.Channels {
channels[i] = ch.ID
}
data, err := json.Marshal(channels)
if err != nil {
return map[string]interface{}{}, err
}
val["channels"] = string(data)
val["channels"] = channels
}
if ce.ClientCert != "" {
val["client_cert"] = ce.ClientCert
@@ -121,21 +115,11 @@ func (rce listConfigsEvent) Encode() (map[string]interface{}, error) {
"operation": configList,
}
if len(rce.fullMatch) > 0 {
data, err := json.Marshal(rce.fullMatch)
if err != nil {
return map[string]interface{}{}, err
}
val["full_match"] = data
val["full_match"] = rce.fullMatch
}
if len(rce.partialMatch) > 0 {
data, err := json.Marshal(rce.partialMatch)
if err != nil {
return map[string]interface{}{}, err
}
val["full_match"] = data
val["full_match"] = rce.partialMatch
}
return val, nil
}
@@ -173,11 +157,7 @@ func (be bootstrapEvent) Encode() (map[string]interface{}, error) {
for i, ch := range be.Channels {
channels[i] = ch.ID
}
data, err := json.Marshal(channels)
if err != nil {
return map[string]interface{}{}, err
}
val["channels"] = string(data)
val["channels"] = channels
}
if be.ClientCert != "" {
val["client_cert"] = be.ClientCert
@@ -213,14 +193,9 @@ type updateConnectionsEvent struct {
}
func (uce updateConnectionsEvent) Encode() (map[string]interface{}, error) {
data, err := json.Marshal(uce.mgChannels)
if err != nil {
return map[string]interface{}{}, err
}
return map[string]interface{}{
"thing_id": uce.mgThing,
"channels": string(data),
"channels": uce.mgChannels,
"operation": thingUpdateConnections,
}, nil
}
@@ -267,12 +242,7 @@ func (uche updateChannelHandlerEvent) Encode() (map[string]interface{}, error) {
val["name"] = uche.Name
}
if uche.Metadata != nil {
metadata, err := json.Marshal(uche.Metadata)
if err != nil {
return map[string]interface{}{}, err
}
val["metadata"] = metadata
val["metadata"] = uche.Metadata
}
return val, nil
}
+10 -20
View File
@@ -5,7 +5,6 @@ package producer_test
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
@@ -123,7 +122,7 @@ func TestAdd(t *testing.T) {
"thing_id": "1",
"owner": email,
"name": config.Name,
"channels": strings.Join(channels, ", "),
"channels": channels,
"external_id": config.ExternalID,
"content": config.Content,
"timestamp": time.Now().Unix(),
@@ -238,8 +237,6 @@ func TestUpdate(t *testing.T) {
nonExisting.ThingID = "unknown"
channels := []string{modified.Channels[0].ID, modified.Channels[1].ID}
chs, err := json.Marshal(channels)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
cases := []struct {
desc string
@@ -258,7 +255,7 @@ func TestUpdate(t *testing.T) {
"content": modified.Content,
"timestamp": time.Now().UnixNano(),
"operation": configUpdate,
"channels": string(chs),
"channels": channels,
"external_id": modified.ExternalID,
"thing_id": modified.ThingID,
"owner": validID,
@@ -340,7 +337,7 @@ func TestUpdateConnections(t *testing.T) {
err: nil,
event: map[string]interface{}{
"thing_id": saved.ThingID,
"channels": "2",
"channels": []string{"2"},
"timestamp": time.Now().Unix(),
"operation": thingUpdateConnections,
},
@@ -1226,21 +1223,14 @@ func test(t *testing.T, expected, actual map[string]interface{}, description str
delete(actual, "occurred_at")
}
if expected["channels"] != nil || actual["channels"] != nil {
ech := expected["channels"]
ach := actual["channels"]
exchs := expected["channels"].([]interface{})
achs := actual["channels"].([]interface{})
che := []string{}
err = json.Unmarshal([]byte(ech.(string)), &che)
require.Nil(t, err, fmt.Sprintf("%s: expected to get a valid channels, got %s", description, err))
cha := []string{}
err = json.Unmarshal([]byte(ach.(string)), &cha)
require.Nil(t, err, fmt.Sprintf("%s: expected to get a valid channels, got %s", description, err))
if assert.ElementsMatchf(t, che, cha, "%s: got incorrect channels\n", description) {
delete(expected, "channels")
delete(actual, "channels")
if exchs != nil && achs != nil {
if assert.Len(t, exchs, len(achs), fmt.Sprintf("%s: got incorrect number of channels\n", description)) {
for _, exch := range exchs {
assert.Contains(t, achs, exch, fmt.Sprintf("%s: got incorrect channel\n", description))
}
}
}