NOISSUE - Fix OPC-UA adapter (#2114)

Signed-off-by: WashingtonKK <washingtonkigan@gmail.com>
This commit is contained in:
Washington Kigani Kamadi
2024-04-01 13:38:12 +03:00
committed by GitHub
parent ed4824d959
commit 65941368b8
10 changed files with 237 additions and 89 deletions
+34
View File
@@ -21,9 +21,13 @@ const (
groupList = groupPrefix + "list"
groupListMemberships = groupPrefix + "list_by_user"
groupRemove = groupPrefix + "remove"
groupAssign = groupPrefix + "assign"
groupUnassign = groupPrefix + "unassign"
)
var (
_ events.Event = (*assignEvent)(nil)
_ events.Event = (*unassignEvent)(nil)
_ events.Event = (*createGroupEvent)(nil)
_ events.Event = (*updateGroupEvent)(nil)
_ events.Event = (*changeStatusGroupEvent)(nil)
@@ -34,6 +38,36 @@ var (
_ events.Event = (*listGroupMembershipEvent)(nil)
)
type assignEvent struct {
memberIDs []string
groupID string
}
func (cge assignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupAssign,
"member_ids": cge.memberIDs,
"group_id": cge.groupID,
}
return val, nil
}
type unassignEvent struct {
memberIDs []string
groupID string
}
func (cge unassignEvent) Encode() (map[string]interface{}, error) {
val := map[string]interface{}{
"operation": groupUnassign,
"member_ids": cge.memberIDs,
"group_id": cge.groupID,
}
return val, nil
}
type createGroupEvent struct {
groups.Group
}
+26 -1
View File
@@ -140,10 +140,35 @@ func (es eventStore) EnableGroup(ctx context.Context, token, id string) (groups.
}
func (es eventStore) Assign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error {
return es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...)
if err := es.svc.Assign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil {
return err
}
event := assignEvent{
groupID: groupID,
memberIDs: memberIDs,
}
if err := es.Publish(ctx, event); err != nil {
return err
}
return nil
}
func (es eventStore) Unassign(ctx context.Context, token, groupID, relation, memberKind string, memberIDs ...string) error {
if err := es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...); err != nil {
return err
}
event := unassignEvent{
groupID: groupID,
memberIDs: memberIDs,
}
if err := es.Publish(ctx, event); err != nil {
return err
}
return es.svc.Unassign(ctx, token, groupID, relation, memberKind, memberIDs...)
}
+88 -35
View File
@@ -5,8 +5,11 @@ package opcua
import (
"context"
"encoding/base64"
"fmt"
"log/slog"
"regexp"
"strconv"
"github.com/absmach/magistrala/opcua/db"
)
@@ -33,27 +36,30 @@ type Service interface {
RemoveChannel(ctx context.Context, chanID string) error
// ConnectThing creates thingID:channelID route-map
ConnectThing(ctx context.Context, chanID, thingID string) error
ConnectThing(ctx context.Context, chanID string, thingIDs []string) error
// DisconnectThing removes thingID:channelID route-map
DisconnectThing(ctx context.Context, chanID, thingID string) error
DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error
// Browse browses available nodes for a given OPC-UA Server URI and NodeID
Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error)
Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error)
}
// Config OPC-UA Server.
type Config struct {
ServerURI string
NodeID string
Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"`
Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""`
Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""`
CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""`
KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""`
Interval string `env:"MG_OPCUA_ADAPTER_INTERVAL_MS" envDefault:"1000"`
Policy string `env:"MG_OPCUA_ADAPTER_POLICY" envDefault:""`
Mode string `env:"MG_OPCUA_ADAPTER_MODE" envDefault:""`
CertFile string `env:"MG_OPCUA_ADAPTER_CERT_FILE" envDefault:""`
KeyFile string `env:"MG_OPCUA_ADAPTER_KEY_FILE" envDefault:""`
}
var _ Service = (*adapterService)(nil)
var (
_ Service = (*adapterService)(nil)
guidRegex = regexp.MustCompile(`^\{?[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\}?$`)
)
type adapterService struct {
subscriber Subscriber
@@ -102,38 +108,80 @@ func (as *adapterService) RemoveChannel(ctx context.Context, chanID string) erro
return as.channelsRM.Remove(ctx, chanID)
}
func (as *adapterService) ConnectThing(ctx context.Context, chanID, thingID string) error {
func (as *adapterService) ConnectThing(ctx context.Context, chanID string, thingIDs []string) error {
serverURI, err := as.channelsRM.Get(ctx, chanID)
if err != nil {
return err
}
nodeID, err := as.thingsRM.Get(ctx, thingID)
if err != nil {
return err
}
as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Save(ctx, c, c); err != nil {
return err
}
go func() {
if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil {
as.logger.Warn(fmt.Sprintf("subscription failed: %s", err))
for _, thingID := range thingIDs {
nodeID, err := as.thingsRM.Get(ctx, thingID)
if err != nil {
return err
}
}()
// Store subscription details
return db.Save(serverURI, nodeID)
as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Save(ctx, c, c); err != nil {
return err
}
go func() {
if err := as.subscriber.Subscribe(ctx, as.cfg); err != nil {
as.logger.Warn("subscription failed", slog.Any("error", err))
}
}()
// Store subscription details
if err := db.Save(serverURI, nodeID); err != nil {
return err
}
}
return nil
}
func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]BrowsedNode, error) {
nodeID := fmt.Sprintf("%s;%s", namespace, identifier)
func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]BrowsedNode, error) {
idFormat := "s"
switch identifierType {
case "string":
break
case "numeric":
if _, err := strconv.Atoi(identifier); err != nil {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.Any("error", err),
}
as.logger.Warn("failed to parse numeric identifier", args...)
break
}
idFormat = "i"
case "guid":
if !guidRegex.MatchString(identifier) {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
}
as.logger.Warn("GUID identifier has invalid format", args...)
break
}
idFormat = "g"
case "opaque":
if _, err := base64.StdEncoding.DecodeString(identifier); err != nil {
args := []any{
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.Any("error", err),
}
as.logger.Warn("opaque identifier has invalid base64 format", args...)
break
}
idFormat = "b"
}
nodeID := fmt.Sprintf("ns=%s;%s=%s", namespace, idFormat, identifier)
nodes, err := as.browser.Browse(serverURI, nodeID)
if err != nil {
return nil, err
@@ -141,7 +189,12 @@ func (as *adapterService) Browse(ctx context.Context, serverURI, namespace, iden
return nodes, nil
}
func (as *adapterService) DisconnectThing(ctx context.Context, chanID, thingID string) error {
c := fmt.Sprintf("%s:%s", chanID, thingID)
return as.connectRM.Remove(ctx, c)
func (as *adapterService) DisconnectThing(ctx context.Context, chanID string, thingIDs []string) error {
for _, thingID := range thingIDs {
c := fmt.Sprintf("%s:%s", chanID, thingID)
if err := as.connectRM.Remove(ctx, c); err != nil {
return err
}
}
return nil
}
+1 -1
View File
@@ -20,7 +20,7 @@ func browseEndpoint(svc opcua.Service) endpoint.Endpoint {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier)
nodes, err := svc.Browse(ctx, req.ServerURI, req.Namespace, req.Identifier, req.IdentifierType)
if err != nil {
return nil, err
}
+9 -8
View File
@@ -134,12 +134,12 @@ func (lm loggingMiddleware) RemoveChannel(ctx context.Context, mgxChanID string)
return lm.svc.RemoveChannel(ctx, mgxChanID)
}
func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) {
func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", mgxChanID),
slog.String("thing_id", mgxThingID),
slog.Any("thing_ids", mgxThingIDs),
}
if err != nil {
args = append(args, slog.Any("error", err))
@@ -149,15 +149,15 @@ func (lm loggingMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThin
lm.logger.Info("Connect thing to channel completed successfully", args...)
}(time.Now())
return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingID)
return lm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs)
}
func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) (err error) {
func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) (err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("channel_id", mgxChanID),
slog.String("thing_id", mgxThingID),
slog.Any("thing_ids", mgxThingIDs),
}
if err != nil {
args = append(args, slog.Any("error", err))
@@ -167,16 +167,17 @@ func (lm loggingMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxT
lm.logger.Info("Disconnect thing from channel completed successfully", args...)
}(time.Now())
return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID)
return lm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs)
}
func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) (nodes []opcua.BrowsedNode, err error) {
func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) (nodes []opcua.BrowsedNode, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("server_uri", serverURI),
slog.String("namespace", namespace),
slog.String("identifier", identifier),
slog.String("identifier_type", identifierType),
}
if err != nil {
args = append(args, slog.Any("error", err))
@@ -186,5 +187,5 @@ func (lm loggingMiddleware) Browse(ctx context.Context, serverURI, namespace, id
lm.logger.Info("Browse available nodes completed successfully", args...)
}(time.Now())
return lm.svc.Browse(ctx, serverURI, namespace, identifier)
return lm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType)
}
+6 -6
View File
@@ -84,29 +84,29 @@ func (mm *metricsMiddleware) RemoveChannel(ctx context.Context, mgxChanID string
return mm.svc.RemoveChannel(ctx, mgxChanID)
}
func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID, mgxThingID string) error {
func (mm *metricsMiddleware) ConnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error {
defer func(begin time.Time) {
mm.counter.With("method", "connect_thing").Add(1)
mm.latency.With("method", "connect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingID)
return mm.svc.ConnectThing(ctx, mgxChanID, mgxThingIDs)
}
func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID, mgxThingID string) error {
func (mm *metricsMiddleware) DisconnectThing(ctx context.Context, mgxChanID string, mgxThingIDs []string) error {
defer func(begin time.Time) {
mm.counter.With("method", "disconnect_thing").Add(1)
mm.latency.With("method", "disconnect_thing").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingID)
return mm.svc.DisconnectThing(ctx, mgxChanID, mgxThingIDs)
}
func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier string) ([]opcua.BrowsedNode, error) {
func (mm *metricsMiddleware) Browse(ctx context.Context, serverURI, namespace, identifier, identifierType string) ([]opcua.BrowsedNode, error) {
defer func(begin time.Time) {
mm.counter.With("method", "browse").Add(1)
mm.latency.With("method", "browse").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.Browse(ctx, serverURI, namespace, identifier)
return mm.svc.Browse(ctx, serverURI, namespace, identifier, identifierType)
}
+4 -3
View File
@@ -6,9 +6,10 @@ package api
import "github.com/absmach/magistrala/internal/apiutil"
type browseReq struct {
ServerURI string
Namespace string
Identifier string
ServerURI string
Namespace string
Identifier string
IdentifierType string
}
func (req *browseReq) validate() error {
+16 -9
View File
@@ -19,12 +19,13 @@ import (
)
const (
contentType = "application/json"
serverParam = "server"
namespaceParam = "namespace"
identifierParam = "identifier"
defNamespace = "ns=0" // Standard root namespace
defIdentifier = "i=84" // Standard root identifier
contentType = "application/json"
serverParam = "server"
namespaceParam = "namespace"
identifierParam = "identifier"
identifierTypeParam = "identifierType"
defNamespace = "ns=0" // Standard root namespace
defIdentifier = "i=84" // Standard root identifier
)
// MakeHandler returns a HTTP handler for API endpoints.
@@ -64,15 +65,21 @@ func decodeBrowse(_ context.Context, r *http.Request) (interface{}, error) {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
iType, err := apiutil.ReadStringQuery(r, identifierTypeParam, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
if n == "" || i == "" {
n = defNamespace
i = defIdentifier
}
req := browseReq{
ServerURI: s,
Namespace: n,
Identifier: i,
ServerURI: s,
Namespace: n,
Identifier: i,
IdentifierType: iType,
}
return req, nil
+2 -2
View File
@@ -13,8 +13,8 @@ type removeThingEvent struct {
}
type connectThingEvent struct {
chanID string
thingID string
chanID string
thingIDs []string
}
type createChannelEvent struct {
+51 -24
View File
@@ -5,6 +5,7 @@ package events
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
@@ -13,21 +14,21 @@ import (
)
const (
keyType = "opcua"
keyNodeID = "node_id"
keyServerURI = "server_uri"
thingPrefix = "thing."
thingCreate = thingPrefix + "create"
thingUpdate = thingPrefix + "update"
thingRemove = thingPrefix + "remove"
thingConnect = thingPrefix + "connect"
thingDisconnect = thingPrefix + "disconnect"
keyType = "opcua"
keyNodeID = "node_id"
keyServerURI = "server_uri"
channelPrefix = "group."
channelCreate = channelPrefix + "create"
channelUpdate = channelPrefix + "update"
channelRemove = channelPrefix + "remove"
thingPrefix = "thing."
thingCreate = thingPrefix + "create"
thingUpdate = thingPrefix + "update"
thingRemove = thingPrefix + "remove"
channelCreate = channelPrefix + "create"
channelUpdate = channelPrefix + "update"
channelRemove = channelPrefix + "remove"
channelConnect = channelPrefix + "assign"
channelDisconnect = channelPrefix + "unassign"
)
var (
@@ -92,12 +93,12 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
case channelRemove:
rce := decodeRemoveChannel(msg)
err = es.svc.RemoveChannel(ctx, rce.id)
case thingConnect:
case channelConnect:
rce := decodeConnectThing(msg)
err = es.svc.ConnectThing(ctx, rce.chanID, rce.thingID)
case thingDisconnect:
err = es.svc.ConnectThing(ctx, rce.chanID, rce.thingIDs)
case channelDisconnect:
rce := decodeDisconnectThing(msg)
err = es.svc.DisconnectThing(ctx, rce.chanID, rce.thingID)
err = es.svc.DisconnectThing(ctx, rce.chanID, rce.thingIDs)
}
if err != nil && err != errMetadataType {
return err
@@ -108,8 +109,14 @@ func (es *eventHandler) Handle(ctx context.Context, event events.Event) error {
func decodeCreateThing(event map[string]interface{}) (createThingEvent, error) {
strmeta := read(event, "metadata", "{}")
// Metadata is base64 encoded since it is marshalled as []byte.
meta, err := base64.StdEncoding.DecodeString(strmeta)
if err != nil {
return createThingEvent{}, err
}
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil {
if err := json.Unmarshal(meta, &metadata); err != nil {
return createThingEvent{}, err
}
@@ -144,8 +151,12 @@ func decodeRemoveThing(event map[string]interface{}) removeThingEvent {
func decodeCreateChannel(event map[string]interface{}) (createChannelEvent, error) {
strmeta := read(event, "metadata", "{}")
meta, err := base64.StdEncoding.DecodeString(strmeta)
if err != nil {
return createChannelEvent{}, err
}
var metadata map[string]interface{}
if err := json.Unmarshal([]byte(strmeta), &metadata); err != nil {
if err := json.Unmarshal(meta, &metadata); err != nil {
return createChannelEvent{}, err
}
@@ -180,15 +191,15 @@ func decodeRemoveChannel(event map[string]interface{}) removeChannelEvent {
func decodeConnectThing(event map[string]interface{}) connectThingEvent {
return connectThingEvent{
chanID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
chanID: read(event, "group_id", ""),
thingIDs: readMemberIDs(event, "member_ids"),
}
}
func decodeDisconnectThing(event map[string]interface{}) connectThingEvent {
return connectThingEvent{
chanID: read(event, "chan_id", ""),
thingID: read(event, "thing_id", ""),
chanID: read(event, "chan_id", ""),
thingIDs: readMemberIDs(event, "member_ids"),
}
}
@@ -200,3 +211,19 @@ func read(event map[string]interface{}, key, def string) string {
return val
}
func readMemberIDs(event map[string]interface{}, key string) []string {
var memberIDs []string
val, ok := event[key].([]interface{})
if !ok {
return memberIDs
}
for _, v := range val {
if str, ok := v.(string); ok {
memberIDs = append(memberIDs, str)
}
}
return memberIDs
}