mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:50:18 +00:00
MG-2200 - Remove Unused Errors (#2243)
Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>
This commit is contained in:
@@ -6,7 +6,6 @@ package auth
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -42,9 +41,6 @@ const (
|
||||
Unknown = "unknown"
|
||||
)
|
||||
|
||||
// ErrStatusAlreadyAssigned indicated that the client or group has already been assigned the status.
|
||||
var ErrStatusAlreadyAssigned = errors.New("status already assigned")
|
||||
|
||||
// String converts client/group status to string literal.
|
||||
func (s Status) String() string {
|
||||
switch s {
|
||||
|
||||
@@ -184,13 +184,13 @@ func TestParse(t *testing.T) {
|
||||
desc: "parse expired key",
|
||||
key: auth.Key{},
|
||||
token: expToken,
|
||||
err: authjwt.ErrExpiry,
|
||||
err: auth.ErrExpiry,
|
||||
},
|
||||
{
|
||||
desc: "parse expired API key",
|
||||
key: apiKey,
|
||||
token: apiToken,
|
||||
err: authjwt.ErrExpiry,
|
||||
err: auth.ErrExpiry,
|
||||
},
|
||||
{
|
||||
desc: "parse token with invalid issuer",
|
||||
|
||||
@@ -20,14 +20,8 @@ var (
|
||||
errInvalidIssuer = errors.New("invalid token issuer value")
|
||||
// errJWTExpiryKey is used to check if the token is expired.
|
||||
errJWTExpiryKey = errors.New(`"exp" not satisfied`)
|
||||
// ErrExpiry indicates that the token is expired.
|
||||
ErrExpiry = errors.New("token is expired")
|
||||
// ErrSetClaim indicates an inability to set the claim.
|
||||
ErrSetClaim = errors.New("failed to set claim")
|
||||
// ErrSignJWT indicates an error in signing jwt token.
|
||||
ErrSignJWT = errors.New("failed to sign jwt token")
|
||||
// ErrParseToken indicates a failure to parse the token.
|
||||
ErrParseToken = errors.New("failed to parse token")
|
||||
// ErrValidateJWTToken indicates a failure to validate JWT token.
|
||||
ErrValidateJWTToken = errors.New("failed to validate jwt token")
|
||||
// ErrJSONHandle indicates an error in handling JSON.
|
||||
@@ -107,7 +101,7 @@ func (tok *tokenizer) validateToken(token string) (jwt.Token, error) {
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Contains(err, errJWTExpiryKey) {
|
||||
return nil, ErrExpiry
|
||||
return nil, auth.ErrExpiry
|
||||
}
|
||||
|
||||
return nil, err
|
||||
|
||||
+2
-11
@@ -10,17 +10,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrInvalidKeyIssuedAt indicates that the Key is being used before it's issued.
|
||||
ErrInvalidKeyIssuedAt = errors.New("invalid issue time")
|
||||
|
||||
// ErrKeyExpired indicates that the Key is expired.
|
||||
ErrKeyExpired = errors.New("use of expired key")
|
||||
|
||||
// ErrAPIKeyExpired indicates that the Key is expired
|
||||
// and that the key type is API key.
|
||||
ErrAPIKeyExpired = errors.New("use of expired API key")
|
||||
)
|
||||
// ErrKeyExpired indicates that the Key is expired.
|
||||
var ErrKeyExpired = errors.New("use of expired key")
|
||||
|
||||
type Token struct {
|
||||
AccessToken string // AccessToken contains the security credentials for a login session and identifies the client.
|
||||
|
||||
@@ -24,15 +24,6 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrMissingCACertificate indicates missing CA certificate.
|
||||
ErrMissingCACertificate = errors.New("missing CA certificate for certificate signing")
|
||||
|
||||
// ErrFailedCertCreation indicates failed to certificate creation.
|
||||
ErrFailedCertCreation = errors.New("failed to create client certificate")
|
||||
|
||||
// ErrFailedCertRevocation indicates failed certificate revocation.
|
||||
ErrFailedCertRevocation = errors.New("failed to revoke certificate")
|
||||
|
||||
errFailedCertDecoding = errors.New("failed to decode response from vault service")
|
||||
errFailedToLogin = errors.New("failed to login to Vault")
|
||||
errFailedAppRole = errors.New("failed to create vault new app role")
|
||||
|
||||
@@ -19,9 +19,6 @@ import (
|
||||
|
||||
const chansPrefix = "channels"
|
||||
|
||||
// ErrUnsubscribe indicates an error to unsubscribe.
|
||||
var ErrUnsubscribe = errors.New("unable to unsubscribe")
|
||||
|
||||
// Service specifies CoAP service API.
|
||||
type Service interface {
|
||||
// Publish publishes message to specified channel.
|
||||
|
||||
@@ -61,7 +61,7 @@ func TestMain(m *testing.M) {
|
||||
if err != nil {
|
||||
log.Fatalf("Could not start container: %s", err)
|
||||
}
|
||||
handleInterrupt(m, pool, container)
|
||||
handleInterrupt(pool, container)
|
||||
|
||||
address = fmt.Sprintf("%s:%s", "http://localhost", container.GetPort(port))
|
||||
pool.MaxWait = poolMaxWait
|
||||
@@ -82,7 +82,7 @@ func TestMain(m *testing.M) {
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
func handleInterrupt(m *testing.M, pool *dockertest.Pool, container *dockertest.Resource) {
|
||||
func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) {
|
||||
c := make(chan os.Signal, 2)
|
||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
|
||||
+21
-26
@@ -27,24 +27,19 @@ const protocol = "http"
|
||||
|
||||
// Log message formats.
|
||||
const (
|
||||
LogInfoConnected = "connected with thing_key %s"
|
||||
// ThingPrefix represents the key prefix for Thing authentication scheme.
|
||||
ThingPrefix = "Thing "
|
||||
LogInfoPublished = "published with client_id %s to the topic %s"
|
||||
logInfoConnected = "connected with thing_key %s"
|
||||
logInfoPublished = "published with client_id %s to the topic %s"
|
||||
)
|
||||
|
||||
// Error wrappers for MQTT errors.
|
||||
var (
|
||||
ErrMalformedSubtopic = errors.New("malformed subtopic")
|
||||
ErrClientNotInitialized = errors.New("client is not initialized")
|
||||
ErrMalformedTopic = errors.New("malformed topic")
|
||||
ErrMissingTopicPub = errors.New("failed to publish due to missing topic")
|
||||
ErrMissingTopicSub = errors.New("failed to subscribe due to missing topic")
|
||||
ErrFailedConnect = errors.New("failed to connect")
|
||||
ErrFailedPublish = errors.New("failed to publish")
|
||||
ErrFailedParseSubtopic = errors.New("failed to parse subtopic")
|
||||
ErrFailedPublishConnectEvent = errors.New("failed to publish connect event")
|
||||
ErrFailedPublishToMsgBroker = errors.New("failed to publish to magistrala message broker")
|
||||
errMalformedSubtopic = errors.New("malformed subtopic")
|
||||
errClientNotInitialized = errors.New("client is not initialized")
|
||||
errMalformedTopic = errors.New("malformed topic")
|
||||
errMissingTopicPub = errors.New("failed to publish due to missing topic")
|
||||
errFailedPublish = errors.New("failed to publish")
|
||||
errFailedParseSubtopic = errors.New("failed to parse subtopic")
|
||||
errFailedPublishToMsgBroker = errors.New("failed to publish to magistrala message broker")
|
||||
)
|
||||
|
||||
var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
|
||||
@@ -70,7 +65,7 @@ func NewHandler(publisher messaging.Publisher, logger *slog.Logger, authClient m
|
||||
func (h *handler) AuthConnect(ctx context.Context) error {
|
||||
s, ok := session.FromContext(ctx)
|
||||
if !ok {
|
||||
return ErrClientNotInitialized
|
||||
return errClientNotInitialized
|
||||
}
|
||||
|
||||
var tok string
|
||||
@@ -83,7 +78,7 @@ func (h *handler) AuthConnect(ctx context.Context) error {
|
||||
tok = string(s.Password)
|
||||
}
|
||||
|
||||
h.logger.Info(fmt.Sprintf(LogInfoConnected, tok))
|
||||
h.logger.Info(fmt.Sprintf(logInfoConnected, tok))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -105,20 +100,20 @@ func (h *handler) Connect(ctx context.Context) error {
|
||||
// Publish - after client successfully published.
|
||||
func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
|
||||
if topic == nil {
|
||||
return ErrMissingTopicPub
|
||||
return errMissingTopicPub
|
||||
}
|
||||
topic = &strings.Split(*topic, "?")[0]
|
||||
s, ok := session.FromContext(ctx)
|
||||
if !ok {
|
||||
return errors.Wrap(ErrFailedPublish, ErrClientNotInitialized)
|
||||
return errors.Wrap(errFailedPublish, errClientNotInitialized)
|
||||
}
|
||||
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))
|
||||
h.logger.Info(fmt.Sprintf(logInfoPublished, s.ID, *topic))
|
||||
// Topics are in the format:
|
||||
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
|
||||
|
||||
channelParts := channelRegExp.FindStringSubmatch(*topic)
|
||||
if len(channelParts) < 2 {
|
||||
return errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
|
||||
return errors.Wrap(errFailedPublish, errMalformedTopic)
|
||||
}
|
||||
|
||||
chanID := channelParts[1]
|
||||
@@ -126,7 +121,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
|
||||
subtopic, err := parseSubtopic(subtopic)
|
||||
if err != nil {
|
||||
return errors.Wrap(ErrFailedParseSubtopic, err)
|
||||
return errors.Wrap(errFailedParseSubtopic, err)
|
||||
}
|
||||
|
||||
msg := messaging.Message{
|
||||
@@ -162,7 +157,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
msg.Publisher = res.GetId()
|
||||
|
||||
if err := h.publisher.Publish(ctx, msg.Channel, &msg); err != nil {
|
||||
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
|
||||
return errors.Wrap(errFailedPublishToMsgBroker, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -190,7 +185,7 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
|
||||
subtopic, err := url.QueryUnescape(subtopic)
|
||||
if err != nil {
|
||||
return "", ErrMalformedSubtopic
|
||||
return "", errMalformedSubtopic
|
||||
}
|
||||
subtopic = strings.ReplaceAll(subtopic, "/", ".")
|
||||
|
||||
@@ -202,7 +197,7 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
}
|
||||
|
||||
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
|
||||
return "", ErrMalformedSubtopic
|
||||
return "", errMalformedSubtopic
|
||||
}
|
||||
|
||||
filteredElems = append(filteredElems, elem)
|
||||
@@ -214,9 +209,9 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
|
||||
// extractThingKey returns value of the thing key. If there is no thing key - an empty value is returned.
|
||||
func extractThingKey(topic string) string {
|
||||
if !strings.HasPrefix(topic, ThingPrefix) {
|
||||
if !strings.HasPrefix(topic, apiutil.ThingPrefix) {
|
||||
return ""
|
||||
}
|
||||
|
||||
return strings.TrimPrefix(topic, ThingPrefix)
|
||||
return strings.TrimPrefix(topic, apiutil.ThingPrefix)
|
||||
}
|
||||
|
||||
@@ -66,9 +66,6 @@ var (
|
||||
// ErrMalformedPolicyAct indicates missing policies action.
|
||||
ErrMalformedPolicyAct = errors.New("malformed policy action")
|
||||
|
||||
// ErrMalformedPolicyRel indicates missing policies relation.
|
||||
ErrMalformedPolicyRel = errors.New("malformed policy relation")
|
||||
|
||||
// ErrMalformedPolicyPer indicates missing policies relation.
|
||||
ErrMalformedPolicyPer = errors.New("malformed policy permission")
|
||||
|
||||
@@ -132,15 +129,6 @@ var (
|
||||
// ErrPasswordFormat indicates weak password.
|
||||
ErrPasswordFormat = errors.New("password does not meet the requirements")
|
||||
|
||||
// ErrMissingOwner indicates missing entity owner.
|
||||
ErrMissingOwner = errors.New("missing entity owner")
|
||||
|
||||
// ErrMissingPolicyOwner indicated malformed policy owner.
|
||||
ErrMissingPolicyOwner = errors.New("malformed policy owner")
|
||||
|
||||
// ErrMissingPolicyEntityType indicates malformed policy entity type.
|
||||
ErrMissingPolicyEntityType = errors.New("malformed or missing entity type")
|
||||
|
||||
// ErrMissingName indicates missing identity name.
|
||||
ErrMissingName = errors.New("missing identity name")
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrMissingEmailTemplate missing email template file.
|
||||
// errMissingEmailTemplate missing email template file.
|
||||
errMissingEmailTemplate = errors.New("Missing e-mail template file")
|
||||
errParseTemplate = errors.New("Parse e-mail template failed")
|
||||
errExecTemplate = errors.New("Execute e-mail template failed")
|
||||
|
||||
@@ -6,9 +6,6 @@ package clients
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
// ErrInvalidStatus indicates invalid status.
|
||||
ErrInvalidStatus = errors.New("invalid client status")
|
||||
|
||||
// ErrEnableClient indicates error in enabling client.
|
||||
ErrEnableClient = errors.New("failed to enable client")
|
||||
|
||||
|
||||
@@ -28,27 +28,12 @@ var (
|
||||
// ErrRemoveEntity indicates error in removing entity.
|
||||
ErrRemoveEntity = errors.New("failed to remove entity")
|
||||
|
||||
// ErrScanMetadata indicates problem with metadata in db.
|
||||
ErrScanMetadata = errors.New("failed to scan metadata in db")
|
||||
|
||||
// ErrWrongSecret indicates a wrong secret was provided.
|
||||
ErrWrongSecret = errors.New("wrong secret")
|
||||
|
||||
// ErrLogin indicates wrong login credentials.
|
||||
ErrLogin = errors.New("invalid user id or secret")
|
||||
|
||||
// ErrFailedOpDB indicates a failure in a database operation.
|
||||
ErrFailedOpDB = errors.New("operation on db element failed")
|
||||
|
||||
// ErrRollbackTx indicates failed to rollback transaction.
|
||||
ErrRollbackTx = errors.New("failed to rollback transaction")
|
||||
|
||||
// ErrMissingSecret indicates missing secret.
|
||||
ErrMissingSecret = errors.New("missing secret")
|
||||
|
||||
// ErrInvalidSecret indicates invalid secret.
|
||||
ErrInvalidSecret = errors.New("missing secret")
|
||||
|
||||
// ErrFailedToRetrieveAllGroups failed to retrieve groups.
|
||||
ErrFailedToRetrieveAllGroups = errors.New("failed to retrieve all groups")
|
||||
)
|
||||
|
||||
@@ -49,21 +49,6 @@ var (
|
||||
// ErrInvalidPolicy indicates that an invalid policy.
|
||||
ErrInvalidPolicy = errors.New("invalid policy")
|
||||
|
||||
// ErrRecoveryToken indicates error in generating password recovery token.
|
||||
ErrRecoveryToken = errors.New("failed to generate password recovery token")
|
||||
|
||||
// ErrFailedPolicyUpdate indicates a failure to update user policy.
|
||||
ErrFailedPolicyUpdate = errors.New("failed to update user policy")
|
||||
|
||||
// ErrPasswordFormat indicates weak password.
|
||||
ErrPasswordFormat = errors.New("password does not meet the requirements")
|
||||
|
||||
// ErrFailedUpdateRole indicates a failure to update user role.
|
||||
ErrFailedUpdateRole = errors.New("failed to update user role")
|
||||
|
||||
// ErrFailedPermissionsList indicates a failure to list permissions.
|
||||
ErrFailedPermissionsList = errors.New("failed to list permissions")
|
||||
|
||||
// ErrEnableClient indicates error in enabling client.
|
||||
ErrEnableClient = errors.New("failed to enable client")
|
||||
|
||||
|
||||
@@ -22,14 +22,14 @@ var (
|
||||
// ErrConnect indicates that connection to MQTT broker failed.
|
||||
ErrConnect = errors.New("failed to connect to MQTT broker")
|
||||
|
||||
// ErrSubscribeTimeout indicates that the subscription failed due to timeout.
|
||||
ErrSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
|
||||
// errSubscribeTimeout indicates that the subscription failed due to timeout.
|
||||
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
|
||||
|
||||
// ErrUnsubscribeTimeout indicates that unsubscribe failed due to timeout.
|
||||
ErrUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
|
||||
// errUnsubscribeTimeout indicates that unsubscribe failed due to timeout.
|
||||
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
|
||||
|
||||
// ErrUnsubscribeDeleteTopic indicates that unsubscribe failed because the topic was deleted.
|
||||
ErrUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
|
||||
// errUnsubscribeDeleteTopic indicates that unsubscribe failed because the topic was deleted.
|
||||
errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
|
||||
|
||||
// ErrNotSubscribed indicates that the topic is not subscribed to.
|
||||
ErrNotSubscribed = errors.New("not subscribed")
|
||||
@@ -116,7 +116,7 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig)
|
||||
return token.Error()
|
||||
}
|
||||
if ok := token.WaitTimeout(ps.timeout); !ok {
|
||||
return ErrSubscribeTimeout
|
||||
return errSubscribeTimeout
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -161,10 +161,10 @@ func (s *subscription) unsubscribe(topic string, timeout time.Duration) error {
|
||||
}
|
||||
|
||||
if ok := token.WaitTimeout(timeout); !ok {
|
||||
return ErrUnsubscribeTimeout
|
||||
return errUnsubscribeTimeout
|
||||
}
|
||||
if ok := s.delete(topic); !ok {
|
||||
return ErrUnsubscribeDeleteTopic
|
||||
return errUnsubscribeDeleteTopic
|
||||
}
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
|
||||
log.Fatalf("Could not start container: %s", err)
|
||||
}
|
||||
|
||||
handleInterrupt(m, pool, container)
|
||||
handleInterrupt(pool, container)
|
||||
|
||||
address = fmt.Sprintf("%s:%s", "localhost", container.GetPort(port))
|
||||
pool.MaxWait = poolMaxWait
|
||||
@@ -84,7 +84,7 @@ func TestMain(m *testing.M) {
|
||||
}()
|
||||
}
|
||||
|
||||
func handleInterrupt(m *testing.M, pool *dockertest.Pool, container *dockertest.Resource) {
|
||||
func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) {
|
||||
c := make(chan os.Signal, 2)
|
||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
|
||||
@@ -25,6 +25,11 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
var (
|
||||
errMalformedTopic = errors.New("malformed topic")
|
||||
errFailedPublish = errors.New("failed to publish")
|
||||
)
|
||||
|
||||
func setupMessages() (*httptest.Server, *authmocks.AuthClient, *pubsub.PubSub) {
|
||||
auth := new(authmocks.AuthClient)
|
||||
pub := new(pubsub.PubSub)
|
||||
@@ -99,7 +104,7 @@ func TestSendMessage(t *testing.T) {
|
||||
chanID: "",
|
||||
msg: msg,
|
||||
auth: atoken,
|
||||
err: errors.NewSDKErrorWithStatus(errors.Wrap(adapter.ErrFailedPublish, adapter.ErrMalformedTopic), http.StatusBadRequest),
|
||||
err: errors.NewSDKErrorWithStatus(errors.Wrap(errFailedPublish, errMalformedTopic), http.StatusBadRequest),
|
||||
},
|
||||
"publish message unable to authorize": {
|
||||
chanID: chanID,
|
||||
|
||||
+19
-19
@@ -17,17 +17,17 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrRedisTwinSave indicates error while saving Twin in redis cache.
|
||||
ErrRedisTwinSave = errors.New("failed to save twin in redis cache")
|
||||
// errRedisTwinSave indicates error while saving Twin in redis cache.
|
||||
errRedisTwinSave = errors.New("failed to save twin in redis cache")
|
||||
|
||||
// ErrRedisTwinUpdate indicates error while saving Twin in redis cache.
|
||||
ErrRedisTwinUpdate = errors.New("failed to update twin in redis cache")
|
||||
// errRedisTwinUpdate indicates error while saving Twin in redis cache.
|
||||
errRedisTwinUpdate = errors.New("failed to update twin in redis cache")
|
||||
|
||||
// ErrRedisTwinIDs indicates error while getting Twin IDs from redis cache.
|
||||
ErrRedisTwinIDs = errors.New("failed to get twin id from redis cache")
|
||||
// errRedisTwinIDs indicates error while getting Twin IDs from redis cache.
|
||||
errRedisTwinIDs = errors.New("failed to get twin id from redis cache")
|
||||
|
||||
// ErrRedisTwinRemove indicates error while removing Twin from redis cache.
|
||||
ErrRedisTwinRemove = errors.New("failed to remove twin from redis cache")
|
||||
// errRedisTwinRemove indicates error while removing Twin from redis cache.
|
||||
errRedisTwinRemove = errors.New("failed to remove twin from redis cache")
|
||||
)
|
||||
|
||||
var _ twins.TwinCache = (*twinCache)(nil)
|
||||
@@ -49,10 +49,10 @@ func (tc *twinCache) Save(ctx context.Context, twin twins.Twin) error {
|
||||
|
||||
func (tc *twinCache) Update(ctx context.Context, twin twins.Twin) error {
|
||||
if err := tc.remove(ctx, twin.ID); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinUpdate, err)
|
||||
return errors.Wrap(errRedisTwinUpdate, err)
|
||||
}
|
||||
if err := tc.save(ctx, twin); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinUpdate, err)
|
||||
return errors.Wrap(errRedisTwinUpdate, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -60,10 +60,10 @@ func (tc *twinCache) Update(ctx context.Context, twin twins.Twin) error {
|
||||
func (tc *twinCache) SaveIDs(ctx context.Context, channel, subtopic string, ids []string) error {
|
||||
for _, id := range ids {
|
||||
if err := tc.client.SAdd(ctx, attrKey(channel, subtopic), id).Err(); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinSave, err)
|
||||
return errors.Wrap(errRedisTwinSave, err)
|
||||
}
|
||||
if err := tc.client.SAdd(ctx, twinKey(id), attrKey(channel, subtopic)).Err(); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinSave, err)
|
||||
return errors.Wrap(errRedisTwinSave, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -72,11 +72,11 @@ func (tc *twinCache) SaveIDs(ctx context.Context, channel, subtopic string, ids
|
||||
func (tc *twinCache) IDs(ctx context.Context, channel, subtopic string) ([]string, error) {
|
||||
ids, err := tc.client.SMembers(ctx, attrKey(channel, subtopic)).Result()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(ErrRedisTwinIDs, err)
|
||||
return nil, errors.Wrap(errRedisTwinIDs, err)
|
||||
}
|
||||
idsWildcard, err := tc.client.SMembers(ctx, attrKey(channel, twins.SubtopicWildcard)).Result()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(ErrRedisTwinIDs, err)
|
||||
return nil, errors.Wrap(errRedisTwinIDs, err)
|
||||
}
|
||||
ids = append(ids, idsWildcard...)
|
||||
return ids, nil
|
||||
@@ -93,10 +93,10 @@ func (tc *twinCache) save(ctx context.Context, twin twins.Twin) error {
|
||||
attributes := twin.Definitions[len(twin.Definitions)-1].Attributes
|
||||
for _, attr := range attributes {
|
||||
if err := tc.client.SAdd(ctx, attrKey(attr.Channel, attr.Subtopic), twin.ID).Err(); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinSave, err)
|
||||
return errors.Wrap(errRedisTwinSave, err)
|
||||
}
|
||||
if err := tc.client.SAdd(ctx, twinKey(twin.ID), attrKey(attr.Channel, attr.Subtopic)).Err(); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinSave, err)
|
||||
return errors.Wrap(errRedisTwinSave, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -106,14 +106,14 @@ func (tc *twinCache) remove(ctx context.Context, twinID string) error {
|
||||
twinKey := twinKey(twinID)
|
||||
attrKeys, err := tc.client.SMembers(ctx, twinKey).Result()
|
||||
if err != nil {
|
||||
return errors.Wrap(ErrRedisTwinRemove, err)
|
||||
return errors.Wrap(errRedisTwinRemove, err)
|
||||
}
|
||||
if err := tc.client.Del(ctx, twinKey).Err(); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinRemove, err)
|
||||
return errors.Wrap(errRedisTwinRemove, err)
|
||||
}
|
||||
for _, attrKey := range attrKeys {
|
||||
if err := tc.client.SRem(ctx, attrKey, twinID).Err(); err != nil {
|
||||
return errors.Wrap(ErrRedisTwinRemove, err)
|
||||
return errors.Wrap(errRedisTwinRemove, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -2331,7 +2331,7 @@ func TestGenerateResetToken(t *testing.T) {
|
||||
retrieveByIdentityResponse: client,
|
||||
issueResponse: &magistrala.Token{},
|
||||
issueErr: svcerr.ErrAuthorization,
|
||||
err: svcerr.ErrRecoveryToken,
|
||||
err: svcerr.ErrAuthorization,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
+6
-18
@@ -17,29 +17,17 @@ import (
|
||||
const chansPrefix = "channels"
|
||||
|
||||
var (
|
||||
// ErrFailedMessagePublish indicates that message publishing failed.
|
||||
ErrFailedMessagePublish = errors.New("failed to publish message")
|
||||
// errFailedMessagePublish indicates that message publishing failed.
|
||||
errFailedMessagePublish = errors.New("failed to publish message")
|
||||
|
||||
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
|
||||
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
|
||||
|
||||
// ErrFailedUnsubscribe indicates that client couldn't unsubscribe from specified channel.
|
||||
ErrFailedUnsubscribe = errors.New("failed to unsubscribe from a channel")
|
||||
|
||||
// ErrFailedConnection indicates that service couldn't connect to message broker.
|
||||
ErrFailedConnection = errors.New("failed to connect to message broker")
|
||||
|
||||
// ErrInvalidConnection indicates that client couldn't subscribe to message broker.
|
||||
ErrInvalidConnection = errors.New("nats: invalid connection")
|
||||
|
||||
// ErrUnauthorizedAccess indicates that client provided missing or invalid credentials.
|
||||
ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
|
||||
// errFailedUnsubscribe indicates that client couldn't unsubscribe from specified channel.
|
||||
errFailedUnsubscribe = errors.New("failed to unsubscribe from a channel")
|
||||
|
||||
// ErrEmptyTopic indicate absence of thingKey in the request.
|
||||
ErrEmptyTopic = errors.New("empty topic")
|
||||
|
||||
// ErrEmptyID indicate absence of channelID in the request.
|
||||
ErrEmptyID = errors.New("empty id")
|
||||
)
|
||||
|
||||
// Service specifies web socket service API.
|
||||
@@ -67,12 +55,12 @@ func New(authClient magistrala.AuthzServiceClient, pubsub messaging.PubSub) Serv
|
||||
|
||||
func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subtopic string, c *Client) error {
|
||||
if chanID == "" || thingKey == "" {
|
||||
return ErrUnauthorizedAccess
|
||||
return svcerr.ErrAuthentication
|
||||
}
|
||||
|
||||
thingID, err := svc.authorize(ctx, thingKey, chanID, auth.SubscribePermission)
|
||||
if err != nil {
|
||||
return ErrUnauthorizedAccess
|
||||
return svcerr.ErrAuthorization
|
||||
}
|
||||
|
||||
c.id = thingID
|
||||
|
||||
+5
-4
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/absmach/magistrala"
|
||||
authmocks "github.com/absmach/magistrala/auth/mocks"
|
||||
"github.com/absmach/magistrala/internal/testsutil"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/absmach/magistrala/pkg/messaging/mocks"
|
||||
"github.com/absmach/magistrala/ws"
|
||||
@@ -79,28 +80,28 @@ func TestSubscribe(t *testing.T) {
|
||||
thingKey: authmocks.InvalidValue,
|
||||
chanID: authmocks.InvalidValue,
|
||||
subtopic: subTopic,
|
||||
err: ws.ErrUnauthorizedAccess,
|
||||
err: svcerr.ErrAuthorization,
|
||||
},
|
||||
{
|
||||
desc: "subscribe to channel with empty channel",
|
||||
thingKey: thingKey,
|
||||
chanID: "",
|
||||
subtopic: subTopic,
|
||||
err: ws.ErrUnauthorizedAccess,
|
||||
err: svcerr.ErrAuthentication,
|
||||
},
|
||||
{
|
||||
desc: "subscribe to channel with empty thingKey",
|
||||
thingKey: "",
|
||||
chanID: chanID,
|
||||
subtopic: subTopic,
|
||||
err: ws.ErrUnauthorizedAccess,
|
||||
err: svcerr.ErrAuthentication,
|
||||
},
|
||||
{
|
||||
desc: "subscribe to channel with empty thingKey and empty channel",
|
||||
thingKey: "",
|
||||
chanID: "",
|
||||
subtopic: subTopic,
|
||||
err: ws.ErrUnauthorizedAccess,
|
||||
err: svcerr.ErrAuthentication,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
+1
-1
@@ -111,7 +111,7 @@ func encodeError(w http.ResponseWriter, err error) {
|
||||
var statusCode int
|
||||
|
||||
switch err {
|
||||
case ws.ErrEmptyID, ws.ErrEmptyTopic:
|
||||
case ws.ErrEmptyTopic:
|
||||
statusCode = http.StatusBadRequest
|
||||
case errUnauthorizedAccess:
|
||||
statusCode = http.StatusForbidden
|
||||
|
||||
+24
-29
@@ -35,20 +35,15 @@ const (
|
||||
|
||||
// Error wrappers for MQTT errors.
|
||||
var (
|
||||
ErrMalformedSubtopic = errors.New("malformed subtopic")
|
||||
ErrClientNotInitialized = errors.New("client is not initialized")
|
||||
ErrMalformedTopic = errors.New("malformed topic")
|
||||
ErrMissingClientID = errors.New("client_id not found")
|
||||
ErrMissingTopicPub = errors.New("failed to publish due to missing topic")
|
||||
ErrMissingTopicSub = errors.New("failed to subscribe due to missing topic")
|
||||
ErrFailedConnect = errors.New("failed to connect")
|
||||
ErrFailedSubscribe = errors.New("failed to subscribe")
|
||||
ErrFailedPublish = errors.New("failed to publish")
|
||||
ErrFailedDisconnect = errors.New("failed to disconnect")
|
||||
ErrFailedPublishDisconnectEvent = errors.New("failed to publish disconnect event")
|
||||
ErrFailedParseSubtopic = errors.New("failed to parse subtopic")
|
||||
ErrFailedPublishConnectEvent = errors.New("failed to publish connect event")
|
||||
ErrFailedPublishToMsgBroker = errors.New("failed to publish to magistrala message broker")
|
||||
errMalformedSubtopic = errors.New("malformed subtopic")
|
||||
errClientNotInitialized = errors.New("client is not initialized")
|
||||
errMalformedTopic = errors.New("malformed topic")
|
||||
errMissingTopicPub = errors.New("failed to publish due to missing topic")
|
||||
errMissingTopicSub = errors.New("failed to subscribe due to missing topic")
|
||||
errFailedSubscribe = errors.New("failed to subscribe")
|
||||
errFailedPublish = errors.New("failed to publish")
|
||||
errFailedParseSubtopic = errors.New("failed to parse subtopic")
|
||||
errFailedPublishToMsgBroker = errors.New("failed to publish to magistrala message broker")
|
||||
)
|
||||
|
||||
var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
|
||||
@@ -79,11 +74,11 @@ func (h *handler) AuthConnect(ctx context.Context) error {
|
||||
// prior forwarding to the ws server.
|
||||
func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error {
|
||||
if topic == nil {
|
||||
return ErrMissingTopicPub
|
||||
return errMissingTopicPub
|
||||
}
|
||||
s, ok := session.FromContext(ctx)
|
||||
if !ok {
|
||||
return ErrClientNotInitialized
|
||||
return errClientNotInitialized
|
||||
}
|
||||
|
||||
var token string
|
||||
@@ -102,10 +97,10 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt
|
||||
func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
|
||||
s, ok := session.FromContext(ctx)
|
||||
if !ok {
|
||||
return ErrClientNotInitialized
|
||||
return errClientNotInitialized
|
||||
}
|
||||
if topics == nil || *topics == nil {
|
||||
return ErrMissingTopicSub
|
||||
return errMissingTopicSub
|
||||
}
|
||||
|
||||
var token string
|
||||
@@ -134,19 +129,19 @@ func (h *handler) Connect(ctx context.Context) error {
|
||||
func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
|
||||
s, ok := session.FromContext(ctx)
|
||||
if !ok {
|
||||
return errors.Wrap(ErrFailedPublish, ErrClientNotInitialized)
|
||||
return errors.Wrap(errFailedPublish, errClientNotInitialized)
|
||||
}
|
||||
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))
|
||||
|
||||
if len(*payload) == 0 {
|
||||
return ErrFailedMessagePublish
|
||||
return errFailedMessagePublish
|
||||
}
|
||||
|
||||
// Topics are in the format:
|
||||
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
|
||||
channelParts := channelRegExp.FindStringSubmatch(*topic)
|
||||
if len(channelParts) < 2 {
|
||||
return errors.Wrap(ErrFailedPublish, ErrMalformedTopic)
|
||||
return errors.Wrap(errFailedPublish, errMalformedTopic)
|
||||
}
|
||||
|
||||
chanID := channelParts[1]
|
||||
@@ -154,7 +149,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
|
||||
subtopic, err := parseSubtopic(subtopic)
|
||||
if err != nil {
|
||||
return errors.Wrap(ErrFailedParseSubtopic, err)
|
||||
return errors.Wrap(errFailedParseSubtopic, err)
|
||||
}
|
||||
|
||||
var token string
|
||||
@@ -190,7 +185,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
}
|
||||
|
||||
if err := h.pubsub.Publish(ctx, msg.GetChannel(), &msg); err != nil {
|
||||
return errors.Wrap(ErrFailedPublishToMsgBroker, err)
|
||||
return errors.Wrap(errFailedPublishToMsgBroker, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -200,7 +195,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
|
||||
s, ok := session.FromContext(ctx)
|
||||
if !ok {
|
||||
return errors.Wrap(ErrFailedSubscribe, ErrClientNotInitialized)
|
||||
return errors.Wrap(errFailedSubscribe, errClientNotInitialized)
|
||||
}
|
||||
h.logger.Info(fmt.Sprintf(LogInfoSubscribed, s.ID, strings.Join(*topics, ",")))
|
||||
return nil
|
||||
@@ -210,7 +205,7 @@ func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
|
||||
func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) error {
|
||||
s, ok := session.FromContext(ctx)
|
||||
if !ok {
|
||||
return errors.Wrap(ErrFailedUnsubscribe, ErrClientNotInitialized)
|
||||
return errors.Wrap(errFailedUnsubscribe, errClientNotInitialized)
|
||||
}
|
||||
|
||||
h.logger.Info(fmt.Sprintf(LogInfoUnsubscribed, s.ID, strings.Join(*topics, ",")))
|
||||
@@ -226,12 +221,12 @@ func (h *handler) authAccess(ctx context.Context, password, topic, action string
|
||||
// Topics are in the format:
|
||||
// channels/<channel_id>/messages/<subtopic>/.../ct/<content_type>
|
||||
if !channelRegExp.MatchString(topic) {
|
||||
return ErrMalformedTopic
|
||||
return errMalformedTopic
|
||||
}
|
||||
|
||||
channelParts := channelRegExp.FindStringSubmatch(topic)
|
||||
if len(channelParts) < 1 {
|
||||
return ErrMalformedTopic
|
||||
return errMalformedTopic
|
||||
}
|
||||
|
||||
chanID := channelParts[1]
|
||||
@@ -261,7 +256,7 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
|
||||
subtopic, err := url.QueryUnescape(subtopic)
|
||||
if err != nil {
|
||||
return "", ErrMalformedSubtopic
|
||||
return "", errMalformedSubtopic
|
||||
}
|
||||
subtopic = strings.ReplaceAll(subtopic, "/", ".")
|
||||
|
||||
@@ -273,7 +268,7 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
}
|
||||
|
||||
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
|
||||
return "", ErrMalformedSubtopic
|
||||
return "", errMalformedSubtopic
|
||||
}
|
||||
|
||||
filteredElems = append(filteredElems, elem)
|
||||
|
||||
Reference in New Issue
Block a user