NOISSUE - Add Readers and Consumers SDK (#33)
Continuous Delivery / Build and Push (push) Waiting to run
Check License Header / check-license (push) Waiting to run
Deploy GitHub Pages / swagger-ui (push) Waiting to run

* refactor: aligh bootstrap with new supermq architecture

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* refactor: rename env variables

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* style: add empty line to config files and bootstrap docker compose file

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* refactor: add supermq sdk to magistrala sdk

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* refactor: extend supermq sdk in magistrala sdk

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* reafctor: update responses

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* feat: add readers and consumers sdk

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* ci(messages.go): fix filename

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* feat: add readers sdk

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* refactor: remove notifier interface

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

* refactor: remove notifier interface

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>

---------

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-01-13 12:31:45 +03:00
committed by GitHub
parent ec71a5edfd
commit cd528e75d3
35 changed files with 5353 additions and 261 deletions
+3 -3
View File
@@ -13,7 +13,7 @@ info:
license:
name: Apache 2.0
url: https://github.com/absmach/magistrala/blob/main/LICENSE
version: 0.14.0
version: 0.15.1
servers:
- url: http://localhost:9014
@@ -44,7 +44,7 @@ paths:
"400":
description: Failed due to malformed JSON.
"401":
description: Missing or invalid access token provided.
description: Missing or invalid access token provided.
"403":
description: Failed to perform authorization over the entity.
"409":
@@ -278,7 +278,7 @@ components:
content:
application/health+json:
schema:
$ref: "./schemas/HealthInfo.yml"
$ref: "./schemas/health_info.yml"
securitySchemes:
bearerAuth:
+2 -2
View File
@@ -13,7 +13,7 @@ info:
license:
name: Apache 2.0
url: https://github.com/absmach/magistrala/blob/main/LICENSE
version: 0.14.0
version: 0.15.1
servers:
- url: http://localhost:9003
@@ -292,7 +292,7 @@ components:
content:
application/health+json:
schema:
$ref: "./schemas/HealthInfo.yml"
$ref: "./schemas/health_info.yml"
securitySchemes:
bearerAuth:
+1 -1
View File
@@ -12,6 +12,7 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/postgres"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
@@ -23,7 +24,6 @@ import (
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
+1 -1
View File
@@ -12,6 +12,7 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
httpapi "github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/timescale"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
@@ -23,7 +24,6 @@ import (
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
+1 -1
View File
@@ -6,8 +6,8 @@ package api
import (
"context"
notifiers "github.com/absmach/magistrala/consumers/notifiers"
apiutil "github.com/absmach/supermq/api/http/util"
notifiers "github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-kit/kit/endpoint"
)
+1 -1
View File
@@ -13,11 +13,11 @@ import (
"strings"
"testing"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/api"
"github.com/absmach/magistrala/consumers/notifiers/mocks"
"github.com/absmach/magistrala/internal/testsutil"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/consumers/notifiers"
smqlog "github.com/absmach/supermq/logger"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/uuid"
+1 -1
View File
@@ -10,7 +10,7 @@ import (
"log/slog"
"time"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
)
var _ notifiers.Service = (*loggingMiddleware)(nil)
+1 -1
View File
@@ -9,7 +9,7 @@ import (
"context"
"time"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/go-kit/kit/metrics"
)
+1 -1
View File
@@ -10,10 +10,10 @@ import (
"net/http"
"strings"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq"
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-chi/chi/v5"
kithttp "github.com/go-kit/kit/transport/http"
-47
View File
@@ -1,47 +0,0 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Abstract Machines
package mocks
import (
messaging "github.com/absmach/supermq/pkg/messaging"
mock "github.com/stretchr/testify/mock"
)
// Notifier is an autogenerated mock type for the Notifier type
type Notifier struct {
mock.Mock
}
// Notify provides a mock function with given fields: from, to, msg
func (_m *Notifier) Notify(from string, to []string, msg *messaging.Message) error {
ret := _m.Called(from, to, msg)
if len(ret) == 0 {
panic("no return value specified for Notify")
}
var r0 error
if rf, ok := ret.Get(0).(func(string, []string, *messaging.Message) error); ok {
r0 = rf(from, to, msg)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewNotifier creates a new instance of Notifier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewNotifier(t interface {
mock.TestingT
Cleanup(func())
}) *Notifier {
mock := &Notifier{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+1 -1
View File
@@ -7,7 +7,7 @@ package mocks
import (
context "context"
notifiers "github.com/absmach/supermq/consumers/notifiers"
notifiers "github.com/absmach/magistrala/consumers/notifiers"
mock "github.com/stretchr/testify/mock"
)
+1 -1
View File
@@ -7,7 +7,7 @@ package mocks
import (
context "context"
notifiers "github.com/absmach/supermq/consumers/notifiers"
notifiers "github.com/absmach/magistrala/consumers/notifiers"
mock "github.com/stretchr/testify/mock"
)
@@ -9,7 +9,7 @@ import (
"fmt"
"strings"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
"github.com/jackc/pgerrcode"
@@ -8,8 +8,8 @@ import (
"fmt"
"testing"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/postgres"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/pkg/errors"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
"github.com/stretchr/testify/assert"
+4 -5
View File
@@ -9,7 +9,6 @@ import (
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
notif "github.com/absmach/supermq/consumers/notifiers"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
@@ -47,13 +46,13 @@ type notifierService struct {
authn smqauthn.Authentication
subs SubscriptionsRepository
idp supermq.IDProvider
notifier notif.Notifier
notifier consumers.Notifier
errCh chan error
from string
}
// New instantiates the subscriptions service implementation.
func New(authn smqauthn.Authentication, subs SubscriptionsRepository, idp supermq.IDProvider, notifier notif.Notifier, from string) Service {
func New(authn smqauthn.Authentication, subs SubscriptionsRepository, idp supermq.IDProvider, notifier consumers.Notifier, from string) Service {
return &notifierService{
authn: authn,
subs: subs,
@@ -132,7 +131,7 @@ func (ns *notifierService) ConsumeBlocking(ctx context.Context, message interfac
if len(to) > 0 {
err := ns.notifier.Notify(ns.from, to, msg)
if err != nil {
return errors.Wrap(notif.ErrNotify, err)
return errors.Wrap(consumers.ErrNotify, err)
}
}
@@ -166,7 +165,7 @@ func (ns *notifierService) ConsumeAsync(ctx context.Context, message interface{}
}
if len(to) > 0 {
if err := ns.notifier.Notify(ns.from, to, msg); err != nil {
ns.errCh <- errors.Wrap(notif.ErrNotify, err)
ns.errCh <- errors.Wrap(consumers.ErrNotify, err)
}
}
}
+5 -3
View File
@@ -8,9 +8,11 @@ import (
"fmt"
"testing"
"github.com/absmach/magistrala/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers/mocks"
"github.com/absmach/magistrala/internal/testsutil"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/consumers"
smqmocks "github.com/absmach/supermq/consumers/mocks"
smqauthn "github.com/absmach/supermq/pkg/authn"
authnmocks "github.com/absmach/supermq/pkg/authn/mocks"
"github.com/absmach/supermq/pkg/errors"
@@ -32,7 +34,7 @@ const (
func newService() (notifiers.Service, *authnmocks.Authentication, *mocks.SubscriptionsRepository) {
repo := new(mocks.SubscriptionsRepository)
auth := new(authnmocks.Authentication)
notifier := new(mocks.Notifier)
notifier := new(smqmocks.Notifier)
idp := uuid.NewMock()
from := "exampleFrom"
return notifiers.New(auth, repo, idp, notifier, from), auth, repo
@@ -346,7 +348,7 @@ func TestConsume(t *testing.T) {
{
desc: "test fail",
msg: &errMsg,
err: notifiers.ErrNotify,
err: consumers.ErrNotify,
},
}
+3 -3
View File
@@ -6,7 +6,7 @@ package smpp
import (
"time"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/consumers"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/transformers"
"github.com/absmach/supermq/pkg/transformers/json"
@@ -15,7 +15,7 @@ import (
"github.com/fiorix/go-smpp/smpp/pdu/pdutext"
)
var _ notifiers.Notifier = (*notifier)(nil)
var _ consumers.Notifier = (*notifier)(nil)
type notifier struct {
transmitter *smpp.Transmitter
@@ -27,7 +27,7 @@ type notifier struct {
}
// New instantiates SMTP message notifier.
func New(cfg Config) notifiers.Notifier {
func New(cfg Config) consumers.Notifier {
t := &smpp.Transmitter{
Addr: cfg.Address,
User: cfg.Username,
+3 -3
View File
@@ -7,7 +7,7 @@ import (
"fmt"
"github.com/absmach/magistrala/internal/email"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/supermq/consumers"
"github.com/absmach/supermq/pkg/messaging"
)
@@ -16,14 +16,14 @@ const (
contentTemplate = "A publisher with an id %s sent the message over %s with the following values \n %s"
)
var _ notifiers.Notifier = (*notifier)(nil)
var _ consumers.Notifier = (*notifier)(nil)
type notifier struct {
agent *email.Agent
}
// New instantiates SMTP message notifier.
func New(agent *email.Agent) notifiers.Notifier {
func New(agent *email.Agent) consumers.Notifier {
return &notifier{agent: agent}
}
+1 -1
View File
@@ -8,7 +8,7 @@ package tracing
import (
"context"
"github.com/absmach/supermq/consumers/notifiers"
"github.com/absmach/magistrala/consumers/notifiers"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
+2 -3
View File
@@ -6,13 +6,12 @@ require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.14.0
github.com/absmach/certs v0.0.0-20241209153600-91270de67b5a
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50
github.com/absmach/supermq v0.16.1-0.20250113091433-3a11b54394b2
github.com/authzed/authzed-go v1.2.1
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
github.com/caarlos0/env/v11 v11.3.1
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fiorix/go-smpp v0.0.0-20210403173735-2894b96e70ba
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-chi/chi/v5 v5.2.0
github.com/go-kit/kit v0.13.0
github.com/gofrs/uuid/v5 v5.3.0
@@ -37,7 +36,7 @@ require (
go.opentelemetry.io/otel/trace v1.33.0
golang.org/x/sync v0.10.0
gonum.org/v1/gonum v0.15.1
google.golang.org/grpc v1.69.2
google.golang.org/grpc v1.69.4
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
moul.io/http2curl v1.0.0
)
+4 -6
View File
@@ -27,8 +27,8 @@ github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
github.com/absmach/senml v1.0.6 h1:WPeIl6vQ00k7ghWSZYT/QP0KUxq2+4zQoaC7240pLFk=
github.com/absmach/senml v1.0.6/go.mod h1:QnJNPy1DJPy0+qUW21PTcH/xoh0LgfYZxTfwriMIvmQ=
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50 h1:ndn1Z9wxUIH5chingm2hy3ZhIMt0+lDjD/CFaBEULbY=
github.com/absmach/supermq v0.16.1-0.20250110085603-df5d752c4b50/go.mod h1:VihyvWijocoz2yhXGAL+qHtid24O+qL/N2lxP2vRf/c=
github.com/absmach/supermq v0.16.1-0.20250113091433-3a11b54394b2 h1:mG0ucTmOE41BOzKpZyNkGHbLFangumlZd3TUddJ0J2U=
github.com/absmach/supermq v0.16.1-0.20250113091433-3a11b54394b2/go.mod h1:DjM4b/VF9m/o/MmQ/WUExbdatIvPnqyREe9zFEbW2YI=
github.com/authzed/authzed-go v1.2.1 h1:o54aIs0ocDfVJl/rfIt/75vrb6z+tgPuXjMlSsSEwH0=
github.com/authzed/authzed-go v1.2.1/go.mod h1:/+NblSrzA6Lm6vUO3fqZyLh8MDCLUQq2AyJMlHb32DE=
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b h1:wbh8IK+aMLTCey9sZasO7b6BWLAJnHHvb79fvWCXwxw=
@@ -99,8 +99,6 @@ github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec=
github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-chi/chi/v5 v5.2.0 h1:Aj1EtB0qR2Rdo2dG4O94RIU35w2lvQSj6BRA4+qwFL0=
github.com/go-chi/chi/v5 v5.2.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-gorp/gorp/v3 v3.1.0 h1:ItKF/Vbuj31dmV4jxA1qblpSwkl9g1typ24xoe70IGs=
@@ -593,8 +591,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU=
google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A=
google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk=
+87
View File
@@ -0,0 +1,87 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package sdk
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/absmach/supermq/pkg/errors"
)
const subscriptionEndpoint = "subscriptions"
type Subscription struct {
ID string `json:"id,omitempty"`
OwnerID string `json:"owner_id,omitempty"`
Topic string `json:"topic,omitempty"`
Contact string `json:"contact,omitempty"`
}
func (sdk mgSDK) CreateSubscription(topic, contact, token string) (string, errors.SDKError) {
sub := Subscription{
Topic: topic,
Contact: contact,
}
data, err := json.Marshal(sub)
if err != nil {
return "", errors.NewSDKError(err)
}
url := fmt.Sprintf("%s/%s", sdk.usersURL, subscriptionEndpoint)
headers, _, sdkerr := sdk.processRequest(http.MethodPost, url, token, data, nil, http.StatusCreated)
if sdkerr != nil {
return "", sdkerr
}
id := strings.TrimPrefix(headers.Get("Location"), fmt.Sprintf("/%s/", subscriptionEndpoint))
return id, nil
}
func (sdk mgSDK) ListSubscriptions(pm PageMetadata, token string) (SubscriptionPage, errors.SDKError) {
url, err := sdk.withQueryParams(sdk.usersURL, subscriptionEndpoint, pm)
if err != nil {
return SubscriptionPage{}, errors.NewSDKError(err)
}
_, body, sdkerr := sdk.processRequest(http.MethodGet, url, token, nil, nil, http.StatusOK)
if sdkerr != nil {
return SubscriptionPage{}, sdkerr
}
var sp SubscriptionPage
if err := json.Unmarshal(body, &sp); err != nil {
return SubscriptionPage{}, errors.NewSDKError(err)
}
return sp, nil
}
func (sdk mgSDK) ViewSubscription(id, token string) (Subscription, errors.SDKError) {
url := fmt.Sprintf("%s/%s/%s", sdk.usersURL, subscriptionEndpoint, id)
_, body, err := sdk.processRequest(http.MethodGet, url, token, nil, nil, http.StatusOK)
if err != nil {
return Subscription{}, err
}
var sub Subscription
if err := json.Unmarshal(body, &sub); err != nil {
return Subscription{}, errors.NewSDKError(err)
}
return sub, nil
}
func (sdk mgSDK) DeleteSubscription(id, token string) errors.SDKError {
url := fmt.Sprintf("%s/%s/%s", sdk.usersURL, subscriptionEndpoint, id)
_, _, err := sdk.processRequest(http.MethodDelete, url, token, nil, nil, http.StatusNoContent)
return err
}
+456
View File
@@ -0,0 +1,456 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package sdk_test
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/absmach/magistrala/consumers/notifiers"
httpapi "github.com/absmach/magistrala/consumers/notifiers/api"
notmocks "github.com/absmach/magistrala/consumers/notifiers/mocks"
"github.com/absmach/magistrala/internal/testsutil"
sdk "github.com/absmach/magistrala/pkg/sdk"
apiutil "github.com/absmach/supermq/api/http/util"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
supermqSDK "github.com/absmach/supermq/pkg/sdk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
ownerID = testsutil.GenerateUUID(&testing.T{})
subID = testsutil.GenerateUUID(&testing.T{})
sdkSubReq = sdk.Subscription{
Topic: "topic",
Contact: "contact",
}
sdkSubRes = sdk.Subscription{
Topic: "topic",
Contact: "contact",
OwnerID: ownerID,
ID: subID,
}
notSubReq = notifiers.Subscription{
Contact: "contact",
Topic: "topic",
}
notSubRes = notifiers.Subscription{
Contact: "contact",
Topic: "topic",
OwnerID: ownerID,
ID: subID,
}
wrongID = "wrongID"
instanceID = "instanceID"
contentType = supermqSDK.CTJSON
)
func setupSubscriptions() (*httptest.Server, *notmocks.Service) {
nsvc := new(notmocks.Service)
logger := smqlog.NewMock()
mux := httpapi.MakeHandler(nsvc, logger, instanceID)
return httptest.NewServer(mux), nsvc
}
func TestCreateSubscription(t *testing.T) {
ts, nsvc := setupSubscriptions()
defer ts.Close()
sdkConf := sdk.Config{
UsersURL: ts.URL,
MsgContentType: contentType,
TLSVerification: false,
}
mgsdk := sdk.NewSDK(sdkConf)
cases := []struct {
desc string
subscription sdk.Subscription
token string
empty bool
id string
svcReq notifiers.Subscription
svcErr error
svcRes string
err errors.SDKError
}{
{
desc: "create new subscription",
subscription: sdkSubReq,
token: validToken,
empty: false,
svcReq: notSubReq,
svcRes: subID,
svcErr: nil,
err: nil,
},
{
desc: "create new subscription with empty token",
subscription: sdkSubReq,
token: "",
empty: true,
svcReq: notifiers.Subscription{},
svcRes: "",
svcErr: nil,
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerToken), http.StatusUnauthorized),
},
{
desc: "create new subscription with invalid token",
subscription: sdkSubReq,
token: invalidToken,
empty: true,
svcReq: notSubReq,
svcRes: "",
svcErr: svcerr.ErrAuthentication,
err: errors.NewSDKErrorWithStatus(svcerr.ErrAuthentication, http.StatusUnauthorized),
},
{
desc: "create new subscription with empty topic",
subscription: sdk.Subscription{
Topic: "",
Contact: "contact",
},
token: validToken,
empty: true,
svcReq: notifiers.Subscription{},
svcErr: nil,
svcRes: "",
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrInvalidTopic), http.StatusBadRequest),
},
{
desc: "create new subscription with empty contact",
subscription: sdk.Subscription{
Topic: "topic",
Contact: "",
},
token: validToken,
empty: true,
svcReq: notifiers.Subscription{},
svcErr: nil,
svcRes: "",
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrInvalidContact), http.StatusBadRequest),
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
svcCall := nsvc.On("CreateSubscription", mock.Anything, tc.token, tc.svcReq).Return(tc.svcRes, tc.svcErr)
loc, err := mgsdk.CreateSubscription(tc.subscription.Topic, tc.subscription.Contact, tc.token)
assert.Equal(t, tc.err, err)
assert.Equal(t, tc.empty, loc == "")
if tc.err == nil {
ok := svcCall.Parent.AssertCalled(t, "CreateSubscription", mock.Anything, tc.token, tc.svcReq)
assert.True(t, ok)
}
svcCall.Unset()
})
}
}
func TestViewSubscription(t *testing.T) {
ts, nsvc := setupSubscriptions()
defer ts.Close()
sdkConf := sdk.Config{
UsersURL: ts.URL,
MsgContentType: contentType,
TLSVerification: false,
}
mgsdk := sdk.NewSDK(sdkConf)
cases := []struct {
desc string
subID string
token string
svcRes notifiers.Subscription
svcErr error
response sdk.Subscription
err errors.SDKError
}{
{
desc: "view existing subscription",
subID: subID,
token: validToken,
svcRes: notSubRes,
svcErr: nil,
response: sdkSubRes,
err: nil,
},
{
desc: "view non-existent subscription",
subID: wrongID,
token: validToken,
svcRes: notifiers.Subscription{},
svcErr: svcerr.ErrNotFound,
response: sdk.Subscription{},
err: errors.NewSDKErrorWithStatus(svcerr.ErrNotFound, http.StatusNotFound),
},
{
desc: "view subscription with invalid token",
subID: subID,
token: invalidToken,
svcRes: notifiers.Subscription{},
svcErr: svcerr.ErrAuthentication,
response: sdk.Subscription{},
err: errors.NewSDKErrorWithStatus(svcerr.ErrAuthentication, http.StatusUnauthorized),
},
{
desc: "view subscription with empty token",
subID: subID,
token: "",
svcRes: notifiers.Subscription{},
svcErr: nil,
response: sdk.Subscription{},
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerToken), http.StatusUnauthorized),
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
svcCall := nsvc.On("ViewSubscription", mock.Anything, tc.token, tc.subID).Return(tc.svcRes, tc.svcErr)
resp, err := mgsdk.ViewSubscription(tc.subID, tc.token)
assert.Equal(t, tc.err, err)
assert.Equal(t, tc.response, resp)
if tc.err == nil {
ok := svcCall.Parent.AssertCalled(t, "ViewSubscription", mock.Anything, tc.token, tc.subID)
assert.True(t, ok)
}
svcCall.Unset()
})
}
}
func TestListSubscription(t *testing.T) {
ts, nsvc := setupSubscriptions()
defer ts.Close()
sdkConf := sdk.Config{
UsersURL: ts.URL,
MsgContentType: contentType,
TLSVerification: false,
}
mgsdk := sdk.NewSDK(sdkConf)
nSubs := 10
noSubs := []notifiers.Subscription{}
sdSubs := []sdk.Subscription{}
for i := 0; i < nSubs; i++ {
nosub := notifiers.Subscription{
OwnerID: ownerID,
Topic: fmt.Sprintf("topic_%d", i),
Contact: fmt.Sprintf("contact_%d", i),
}
noSubs = append(noSubs, nosub)
sdsub := sdk.Subscription{
OwnerID: ownerID,
Topic: fmt.Sprintf("topic_%d", i),
Contact: fmt.Sprintf("contact_%d", i),
}
sdSubs = append(sdSubs, sdsub)
}
cases := []struct {
desc string
token string
pageMeta sdk.PageMetadata
svcReq notifiers.PageMetadata
svcRes notifiers.Page
svcErr error
response sdk.SubscriptionPage
err errors.SDKError
}{
{
desc: "list all subscription",
token: validToken,
pageMeta: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
svcReq: notifiers.PageMetadata{
Offset: 0,
Limit: 10,
},
svcRes: notifiers.Page{
Total: 10,
Subscriptions: noSubs,
},
svcErr: nil,
response: sdk.SubscriptionPage{
PageRes: sdk.PageRes{
Total: 10,
},
Subscriptions: sdSubs,
},
err: nil,
},
{
desc: "list subscription with specific topic",
token: validToken,
pageMeta: sdk.PageMetadata{
Offset: 0,
Limit: 10,
Topic: "topic_1",
},
svcReq: notifiers.PageMetadata{
Offset: 0,
Limit: 10,
Topic: "topic_1",
},
svcRes: notifiers.Page{
Total: uint(len(noSubs[1:2])),
Subscriptions: noSubs[1:2],
},
svcErr: nil,
response: sdk.SubscriptionPage{
PageRes: sdk.PageRes{
Total: uint64(len(sdSubs[1:2])),
},
Subscriptions: sdSubs[1:2],
},
err: nil,
},
{
desc: "list subscription with specific contact",
token: validToken,
pageMeta: sdk.PageMetadata{
Offset: 0,
Limit: 10,
Contact: "contact_1",
},
svcReq: notifiers.PageMetadata{
Offset: 0,
Limit: 10,
Contact: "contact_1",
},
svcRes: notifiers.Page{
Total: uint(len(noSubs[1:2])),
Subscriptions: noSubs[1:2],
},
svcErr: nil,
response: sdk.SubscriptionPage{
PageRes: sdk.PageRes{
Total: uint64(len(sdSubs[1:2])),
},
Subscriptions: sdSubs[1:2],
},
err: nil,
},
{
desc: "list subscription with invalid token",
token: invalidToken,
pageMeta: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
svcReq: notifiers.PageMetadata{
Offset: 0,
Limit: 10,
},
svcRes: notifiers.Page{},
svcErr: svcerr.ErrAuthentication,
response: sdk.SubscriptionPage{},
err: errors.NewSDKErrorWithStatus(svcerr.ErrAuthentication, http.StatusUnauthorized),
},
{
desc: "list subscription with empty token",
token: "",
pageMeta: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
svcReq: notifiers.PageMetadata{},
svcRes: notifiers.Page{},
svcErr: nil,
response: sdk.SubscriptionPage{},
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerToken), http.StatusUnauthorized),
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
svcCall := nsvc.On("ListSubscriptions", mock.Anything, tc.token, tc.svcReq).Return(tc.svcRes, tc.svcErr)
resp, err := mgsdk.ListSubscriptions(tc.pageMeta, tc.token)
assert.Equal(t, tc.err, err)
assert.Equal(t, tc.response, resp)
if tc.err == nil {
ok := svcCall.Parent.AssertCalled(t, "ListSubscriptions", mock.Anything, tc.token, tc.svcReq)
assert.True(t, ok)
}
svcCall.Unset()
})
}
}
func TestDeleteSubscription(t *testing.T) {
ts, nsvc := setupSubscriptions()
defer ts.Close()
sdkConf := sdk.Config{
UsersURL: ts.URL,
MsgContentType: contentType,
TLSVerification: false,
}
mgsdk := sdk.NewSDK(sdkConf)
cases := []struct {
desc string
subID string
token string
svcErr error
err errors.SDKError
}{
{
desc: "delete existing subscription",
subID: subID,
token: validToken,
svcErr: nil,
err: nil,
},
{
desc: "delete non-existent subscription",
subID: wrongID,
token: validToken,
svcErr: svcerr.ErrRemoveEntity,
err: errors.NewSDKErrorWithStatus(svcerr.ErrRemoveEntity, http.StatusUnprocessableEntity),
},
{
desc: "delete subscription with invalid token",
subID: subID,
token: invalidToken,
svcErr: svcerr.ErrAuthentication,
err: errors.NewSDKErrorWithStatus(svcerr.ErrAuthentication, http.StatusUnauthorized),
},
{
desc: "delete subscription with empty token",
subID: subID,
token: "",
svcErr: nil,
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerToken), http.StatusUnauthorized),
},
{
desc: "delete subscription with empty subID",
subID: "",
token: validToken,
svcErr: nil,
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrMissingID), http.StatusBadRequest),
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
svcCall := nsvc.On("RemoveSubscription", mock.Anything, tc.token, tc.subID).Return(tc.svcErr)
err := mgsdk.DeleteSubscription(tc.subID, tc.token)
assert.Equal(t, tc.err, err)
if tc.err == nil {
ok := svcCall.Parent.AssertCalled(t, "RemoveSubscription", mock.Anything, tc.token, tc.subID)
assert.True(t, ok)
}
svcCall.Unset()
})
}
}
+77
View File
@@ -0,0 +1,77 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package sdk
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"
"github.com/absmach/supermq/pkg/errors"
)
const channelParts = 2
func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, domainID, token string) (MessagesPage, errors.SDKError) {
chanNameParts := strings.SplitN(chanName, ".", channelParts)
chanID := chanNameParts[0]
subtopicPart := ""
if len(chanNameParts) == channelParts {
subtopicPart = fmt.Sprintf("?subtopic=%s", chanNameParts[1])
}
msgURL, err := sdk.withMessageQueryParams(sdk.readersURL, fmt.Sprintf("channels/%s/messages%s", chanID, subtopicPart), pm)
if err != nil {
return MessagesPage{}, errors.NewSDKError(err)
}
header := make(map[string]string)
header["Content-Type"] = string(sdk.msgContentType)
_, body, sdkerr := sdk.processRequest(http.MethodGet, msgURL, token, nil, header, http.StatusOK)
if sdkerr != nil {
return MessagesPage{}, sdkerr
}
var mp MessagesPage
if err := json.Unmarshal(body, &mp); err != nil {
return MessagesPage{}, errors.NewSDKError(err)
}
return mp, nil
}
func (sdk mgSDK) withMessageQueryParams(baseURL, endpoint string, mpm MessagePageMetadata) (string, error) {
b, err := json.Marshal(mpm)
if err != nil {
return "", err
}
q := map[string]interface{}{}
if err := json.Unmarshal(b, &q); err != nil {
return "", err
}
ret := url.Values{}
for k, v := range q {
switch t := v.(type) {
case string:
ret.Add(k, t)
case float64:
ret.Add(k, strconv.FormatFloat(t, 'f', -1, 64))
case uint64:
ret.Add(k, strconv.FormatUint(t, 10))
case int64:
ret.Add(k, strconv.FormatInt(t, 10))
case json.Number:
ret.Add(k, t.String())
case bool:
ret.Add(k, strconv.FormatBool(t))
}
}
qs := ret.Encode()
return fmt.Sprintf("%s/%s?%s", baseURL, endpoint, qs), nil
}
+246
View File
@@ -0,0 +1,246 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package sdk_test
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
sdk "github.com/absmach/magistrala/pkg/sdk"
readersapi "github.com/absmach/magistrala/readers/api"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
apiutil "github.com/absmach/supermq/api/http/util"
chmocks "github.com/absmach/supermq/channels/mocks"
climocks "github.com/absmach/supermq/clients/mocks"
smqauthn "github.com/absmach/supermq/pkg/authn"
authnmocks "github.com/absmach/supermq/pkg/authn/mocks"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/transformers/senml"
"github.com/absmach/supermq/readers"
readersmocks "github.com/absmach/supermq/readers/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
channelsGRPCClient *chmocks.ChannelsServiceClient
clientsGRPCClient *climocks.ClientsServiceClient
)
func setupReaders() (*httptest.Server, *authnmocks.Authentication, *readersmocks.MessageRepository) {
repo := new(readersmocks.MessageRepository)
authn := new(authnmocks.Authentication)
clientsGRPCClient = new(climocks.ClientsServiceClient)
channelsGRPCClient = new(chmocks.ChannelsServiceClient)
mux := readersapi.MakeHandler(repo, authn, clientsGRPCClient, channelsGRPCClient, "test", "")
return httptest.NewServer(mux), authn, repo
}
func TestReadMessages(t *testing.T) {
ts, authn, repo := setupReaders()
defer ts.Close()
channelID := "channelID"
msgValue := 1.6
boolVal := true
msg := senml.Message{
Name: "current",
Time: 1720000000,
Value: &msgValue,
Publisher: validID,
}
invalidMsg := "[{\"n\":\"current\",\"t\":-1,\"v\":1.6}]"
sdkConf := sdk.Config{
ReaderURL: ts.URL,
}
mgsdk := sdk.NewSDK(sdkConf)
cases := []struct {
desc string
token string
chanName string
domainID string
messagePageMeta sdk.MessagePageMetadata
authzErr error
authnErr error
repoRes readers.MessagesPage
repoErr error
response sdk.MessagesPage
err errors.SDKError
}{
{
desc: "read messages successfully",
token: validToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Limit: 10,
Level: 0,
},
Publisher: validID,
BoolValue: &boolVal,
},
repoRes: readers.MessagesPage{
Total: 1,
Messages: []readers.Message{msg},
},
repoErr: nil,
response: sdk.MessagesPage{
PageRes: sdk.PageRes{
Total: 1,
},
Messages: []senml.Message{msg},
},
err: nil,
},
{
desc: "read messages successfully with subtopic",
token: validToken,
chanName: channelID + ".subtopic",
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
Publisher: validID,
},
repoRes: readers.MessagesPage{
Total: 1,
Messages: []readers.Message{msg},
},
repoErr: nil,
response: sdk.MessagesPage{
PageRes: sdk.PageRes{
Total: 1,
},
Messages: []senml.Message{msg},
},
err: nil,
},
{
desc: "read messages with invalid token",
token: invalidToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
Subtopic: "subtopic",
Publisher: validID,
},
authzErr: svcerr.ErrAuthorization,
repoRes: readers.MessagesPage{},
response: sdk.MessagesPage{},
err: errors.NewSDKErrorWithStatus(errors.Wrap(svcerr.ErrAuthorization, svcerr.ErrAuthorization), http.StatusUnauthorized),
},
{
desc: "read messages with empty token",
token: "",
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
Subtopic: "subtopic",
Publisher: validID,
},
authnErr: svcerr.ErrAuthentication,
repoRes: readers.MessagesPage{},
response: sdk.MessagesPage{},
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerToken), http.StatusUnauthorized),
},
{
desc: "read messages with empty channel ID",
token: validToken,
chanName: "",
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
Subtopic: "subtopic",
Publisher: validID,
},
repoRes: readers.MessagesPage{},
repoErr: nil,
response: sdk.MessagesPage{},
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrMissingID), http.StatusBadRequest),
},
{
desc: "read messages with invalid message page metadata",
token: validToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Limit: 10,
Metadata: map[string]interface{}{
"key": make(chan int),
},
},
Subtopic: "subtopic",
Publisher: validID,
},
repoRes: readers.MessagesPage{},
repoErr: nil,
response: sdk.MessagesPage{},
err: errors.NewSDKError(errors.New("json: unsupported type: chan int")),
},
{
desc: "read messages with response that cannot be unmarshalled",
token: validToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Limit: 10,
},
Subtopic: "subtopic",
Publisher: validID,
},
repoRes: readers.MessagesPage{
Total: 1,
Messages: []readers.Message{invalidMsg},
},
repoErr: nil,
response: sdk.MessagesPage{},
err: errors.NewSDKError(errors.New("json: cannot unmarshal string into Go struct field MessagesPage.messages of type senml.Message")),
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
authCall1 := authn.On("Authenticate", mock.Anything, tc.token).Return(smqauthn.Session{UserID: validID}, tc.authnErr)
authzCall := channelsGRPCClient.On("Authorize", mock.Anything, mock.Anything).Return(&grpcChannelsV1.AuthzRes{Authorized: true}, tc.authzErr)
repoCall := repo.On("ReadAll", channelID, mock.Anything).Return(tc.repoRes, tc.repoErr)
response, err := mgsdk.ReadMessages(tc.messagePageMeta, tc.chanName, tc.domainID, tc.token)
fmt.Println(err)
assert.Equal(t, tc.err, err)
assert.Equal(t, tc.response, response)
if tc.err == nil {
ok := repoCall.Parent.AssertCalled(t, "ReadAll", channelID, mock.Anything)
assert.True(t, ok)
}
authCall1.Unset()
authzCall.Unset()
repoCall.Unset()
})
}
}
+4347 -19
View File
File diff suppressed because it is too large Load Diff
+12
View File
@@ -3,6 +3,8 @@
package sdk
import "github.com/absmach/supermq/pkg/transformers/senml"
type PageRes struct {
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
@@ -14,3 +16,13 @@ type BootstrapPage struct {
Configs []BootstrapConfig `json:"configs"`
PageRes
}
type SubscriptionPage struct {
Subscriptions []Subscription `json:"subscriptions"`
PageRes
}
type MessagesPage struct {
Messages []senml.Message `json:"messages,omitempty"`
PageRes
}
+86 -5
View File
@@ -29,6 +29,26 @@ type PageMetadata struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Metadata Metadata `json:"metadata,omitempty"`
Topic string `json:"topic,omitempty"`
Contact string `json:"contact,omitempty"`
DomainID string `json:"domain_id,omitempty"`
Level uint64 `json:"level,omitempty"`
}
type MessagePageMetadata struct {
PageMetadata
Subtopic string `json:"subtopic,omitempty"`
Publisher string `json:"publisher,omitempty"`
Comparator string `json:"comparator,omitempty"`
BoolValue *bool `json:"vb,omitempty"`
StringValue string `json:"vs,omitempty"`
DataValue string `json:"vd,omitempty"`
From float64 `json:"from,omitempty"`
To float64 `json:"to,omitempty"`
Aggregation string `json:"aggregation,omitempty"`
Interval string `json:"interval,omitempty"`
Value float64 `json:"value,omitempty"`
Protocol string `json:"protocol,omitempty"`
}
// SDK contains Magistrala API.
@@ -124,12 +144,58 @@ type SDK interface {
// err := sdk.Whitelist("clientID", 1, "domainID", "token")
// fmt.Println(err)
Whitelist(clientID string, state int, domainID, token string) errors.SDKError
// ReadMessages read messages of specified channel.
//
// example:
// pm := sdk.MessagePageMetadata{
// Offset: 0,
// Limit: 10,
// }
// msgs, _ := sdk.ReadMessages(pm,"channelID", "domainID", "token")
// fmt.Println(msgs)
ReadMessages(pm MessagePageMetadata, chanID, domainID, token string) (MessagesPage, errors.SDKError)
// CreateSubscription creates a new subscription
//
// example:
// subscription, _ := sdk.CreateSubscription("topic", "contact", "token")
// fmt.Println(subscription)
CreateSubscription(topic, contact, token string) (string, errors.SDKError)
// ListSubscriptions list subscriptions given list parameters.
//
// example:
// pm := sdk.PageMetadata{
// Offset: 0,
// Limit: 10,
// }
// subscriptions, _ := sdk.ListSubscriptions(pm, "token")
// fmt.Println(subscriptions)
ListSubscriptions(pm PageMetadata, token string) (SubscriptionPage, errors.SDKError)
// ViewSubscription retrieves a subscription with the provided id.
//
// example:
// subscription, _ := sdk.ViewSubscription("id", "token")
// fmt.Println(subscription)
ViewSubscription(id, token string) (Subscription, errors.SDKError)
// DeleteSubscription removes a subscription with the provided id.
//
// example:
// err := sdk.DeleteSubscription("id", "token")
// fmt.Println(err)
DeleteSubscription(id, token string) errors.SDKError
}
type mgSDK struct {
bootstrapURL string
client *http.Client
curlFlag bool
bootstrapURL string
readersURL string
usersURL string
client *http.Client
curlFlag bool
msgContentType smqSDK.ContentType
smqSDK.SDK
}
@@ -159,7 +225,6 @@ func NewSDK(conf Config) SDK {
smqSDK := smqSDK.NewSDK(smqSDK.Config{
CertsURL: conf.CertsURL,
HTTPAdapterURL: conf.HTTPAdapterURL,
ReaderURL: conf.ReaderURL,
ClientsURL: conf.ClientsURL,
UsersURL: conf.UsersURL,
GroupsURL: conf.GroupsURL,
@@ -175,7 +240,11 @@ func NewSDK(conf Config) SDK {
})
return &mgSDK{
bootstrapURL: conf.BootstrapURL,
bootstrapURL: conf.BootstrapURL,
readersURL: conf.ReaderURL,
usersURL: conf.UsersURL,
msgContentType: conf.MsgContentType,
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
@@ -265,6 +334,18 @@ func (pm PageMetadata) query() (string, error) {
}
q.Add("metadata", string(md))
}
if pm.Topic != "" {
q.Add("topic", pm.Topic)
}
if pm.Contact != "" {
q.Add("contact", pm.Contact)
}
if pm.DomainID != "" {
q.Add("domain_id", pm.DomainID)
}
if pm.Level != 0 {
q.Add("level", strconv.FormatUint(pm.Level, 10))
}
return q.Encode(), nil
}
+1 -1
View File
@@ -12,6 +12,7 @@ import (
"time"
"github.com/absmach/magistrala/internal/testsutil"
"github.com/absmach/magistrala/readers/api"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
apiutil "github.com/absmach/supermq/api/http/util"
@@ -22,7 +23,6 @@ import (
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/transformers/senml"
"github.com/absmach/supermq/readers"
"github.com/absmach/supermq/readers/api"
"github.com/absmach/supermq/readers/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
-84
View File
@@ -1,84 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package readers
import "errors"
const (
// EqualKey represents the equal comparison operator key.
EqualKey = "eq"
// LowerThanKey represents the lower-than comparison operator key.
LowerThanKey = "lt"
// LowerThanEqualKey represents the lower-than-or-equal comparison operator key.
LowerThanEqualKey = "le"
// GreaterThanKey represents the greater-than-or-equal comparison operator key.
GreaterThanKey = "gt"
// GreaterThanEqualKey represents the greater-than-or-equal comparison operator key.
GreaterThanEqualKey = "ge"
)
// ErrReadMessages indicates failure occurred while reading messages from database.
var ErrReadMessages = errors.New("failed to read messages from database")
// MessageRepository specifies message reader API.
//
//go:generate mockery --name MessageRepository --output=./mocks --filename messages.go --quiet --note "Copyright (c) Abstract Machines"
type MessageRepository interface {
// ReadAll skips given number of messages for given channel and returns next
// limited number of messages.
ReadAll(chanID string, pm PageMetadata) (MessagesPage, error)
}
// Message represents any message format.
type Message interface{}
// MessagesPage contains page related metadata as well as list of messages that
// belong to this page.
type MessagesPage struct {
PageMetadata
Total uint64
Messages []Message
}
// PageMetadata represents the parameters used to create database queries.
type PageMetadata struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Subtopic string `json:"subtopic,omitempty"`
Publisher string `json:"publisher,omitempty"`
Protocol string `json:"protocol,omitempty"`
Name string `json:"name,omitempty"`
Value float64 `json:"v,omitempty"`
Comparator string `json:"comparator,omitempty"`
BoolValue bool `json:"vb,omitempty"`
StringValue string `json:"vs,omitempty"`
DataValue string `json:"vd,omitempty"`
From float64 `json:"from,omitempty"`
To float64 `json:"to,omitempty"`
Format string `json:"format,omitempty"`
Aggregation string `json:"aggregation,omitempty"`
Interval string `json:"interval,omitempty"`
}
// ParseValueComparator convert comparison operator keys into mathematic anotation.
func ParseValueComparator(query map[string]interface{}) string {
comparator := "="
val, ok := query["comparator"]
if ok {
switch val.(string) {
case EqualKey:
comparator = "="
case LowerThanKey:
comparator = "<"
case LowerThanEqualKey:
comparator = "<="
case GreaterThanKey:
comparator = ">"
case GreaterThanEqualKey:
comparator = ">="
}
}
return comparator
}
-5
View File
@@ -1,5 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package mocks contains mocks for testing purposes.
package mocks
-57
View File
@@ -1,57 +0,0 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Abstract Machines
package mocks
import (
readers "github.com/absmach/magistrala/readers"
mock "github.com/stretchr/testify/mock"
)
// MessageRepository is an autogenerated mock type for the MessageRepository type
type MessageRepository struct {
mock.Mock
}
// ReadAll provides a mock function with given fields: chanID, pm
func (_m *MessageRepository) ReadAll(chanID string, pm readers.PageMetadata) (readers.MessagesPage, error) {
ret := _m.Called(chanID, pm)
if len(ret) == 0 {
panic("no return value specified for ReadAll")
}
var r0 readers.MessagesPage
var r1 error
if rf, ok := ret.Get(0).(func(string, readers.PageMetadata) (readers.MessagesPage, error)); ok {
return rf(chanID, pm)
}
if rf, ok := ret.Get(0).(func(string, readers.PageMetadata) readers.MessagesPage); ok {
r0 = rf(chanID, pm)
} else {
r0 = ret.Get(0).(readers.MessagesPage)
}
if rf, ok := ret.Get(1).(func(string, readers.PageMetadata) error); ok {
r1 = rf(chanID, pm)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewMessageRepository creates a new instance of MessageRepository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMessageRepository(t interface {
mock.TestingT
Cleanup(func())
}) *MessageRepository {
mock := &MessageRepository{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+1 -1
View File
@@ -7,7 +7,7 @@ package main
import (
"log"
"github.com/absmach/supermq/tools/e2e"
"github.com/absmach/magistrala/tools/e2e"
cc "github.com/ivanpirog/coloredcobra"
"github.com/spf13/cobra"
)
+1 -1
View File
@@ -7,7 +7,7 @@ package main
import (
"log"
bench "github.com/absmach/supermq/tools/mqtt-bench"
bench "github.com/absmach/magistrala/tools/mqtt-bench"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
+1 -1
View File
@@ -7,7 +7,7 @@ package main
import (
"log"
"github.com/absmach/supermq/tools/provision"
"github.com/absmach/magistrala/tools/provision"
"github.com/spf13/cobra"
)