mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:10:19 +00:00
NOISSUE - Add Linters (#79)
* add linters Signed-off-by: SammyOina <sammyoina@gmail.com> * fix linting errors Signed-off-by: sammy <sammyoina@gmail.com> * fix linting Signed-off-by: sammy <sammyoina@gmail.com> * add callhome alias Signed-off-by: sammy <sammyoina@gmail.com> --------- Signed-off-by: SammyOina <sammyoina@gmail.com> Signed-off-by: sammy <sammyoina@gmail.com>
This commit is contained in:
committed by
GitHub
parent
7692546c1c
commit
6f0874c85a
@@ -14,10 +14,25 @@ issues:
|
||||
- "string `For example:\n` has (\\d+) occurrences, make it a constant"
|
||||
|
||||
linters-settings:
|
||||
importas:
|
||||
no-unaliased: true
|
||||
no-extra-aliases: false
|
||||
alias:
|
||||
- pkg: github.com/mainflux/callhome/pkg/client
|
||||
alias: chclient
|
||||
- pkg: github.com/absmach/magistrala/logger
|
||||
alias: mglog
|
||||
gocritic:
|
||||
enabled-checks:
|
||||
- captLocal
|
||||
- singleCaseSwitch
|
||||
- switchTrue
|
||||
- importShadow
|
||||
- httpNoBody
|
||||
- paramTypeCombine
|
||||
- emptyStringTest
|
||||
- builtinShadow
|
||||
- exposedSyncMutex
|
||||
disabled-checks:
|
||||
- appendAssign
|
||||
enabled-tags:
|
||||
|
||||
+30
-30
@@ -166,8 +166,8 @@ func NewClient(conn *grpc.ClientConn, timeout time.Duration) magistrala.AuthServ
|
||||
}
|
||||
|
||||
func (client grpcClient) Issue(ctx context.Context, req *magistrala.IssueReq, _ ...grpc.CallOption) (*magistrala.Token, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.issue(ctx, issueReq{userID: req.GetUserId(), domainID: req.GetDomainId(), keyType: auth.KeyType(req.Type)})
|
||||
if err != nil {
|
||||
@@ -186,8 +186,8 @@ func decodeIssueResponse(_ context.Context, grpcRes interface{}) (interface{}, e
|
||||
}
|
||||
|
||||
func (client grpcClient) Refresh(ctx context.Context, req *magistrala.RefreshReq, _ ...grpc.CallOption) (*magistrala.Token, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.refresh(ctx, refreshReq{refreshToken: req.GetRefreshToken(), domainID: req.GetDomainId()})
|
||||
if err != nil {
|
||||
@@ -206,8 +206,8 @@ func decodeRefreshResponse(_ context.Context, grpcRes interface{}) (interface{},
|
||||
}
|
||||
|
||||
func (client grpcClient) Identify(ctx context.Context, token *magistrala.IdentityReq, _ ...grpc.CallOption) (*magistrala.IdentityRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.identify(ctx, identityReq{token: token.GetToken()})
|
||||
if err != nil {
|
||||
@@ -229,8 +229,8 @@ func decodeIdentifyResponse(_ context.Context, grpcRes interface{}) (interface{}
|
||||
}
|
||||
|
||||
func (client grpcClient) Authorize(ctx context.Context, req *magistrala.AuthorizeReq, _ ...grpc.CallOption) (r *magistrala.AuthorizeRes, err error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.authorize(ctx, authReq{
|
||||
Domain: req.GetDomain(),
|
||||
@@ -270,8 +270,8 @@ func encodeAuthorizeRequest(_ context.Context, grpcReq interface{}) (interface{}
|
||||
}
|
||||
|
||||
func (client grpcClient) AddPolicy(ctx context.Context, in *magistrala.AddPolicyReq, opts ...grpc.CallOption) (*magistrala.AddPolicyRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.addPolicy(ctx, policyReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -313,8 +313,8 @@ func encodeAddPolicyRequest(_ context.Context, grpcReq interface{}) (interface{}
|
||||
}
|
||||
|
||||
func (client grpcClient) AddPolicies(ctx context.Context, in *magistrala.AddPoliciesReq, opts ...grpc.CallOption) (*magistrala.AddPoliciesRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
r := policiesReq{}
|
||||
if in.GetAddPoliciesReq() != nil {
|
||||
for _, mgApr := range in.GetAddPoliciesReq() {
|
||||
@@ -368,8 +368,8 @@ func encodeAddPoliciesRequest(_ context.Context, grpcReq interface{}) (interface
|
||||
}
|
||||
|
||||
func (client grpcClient) DeletePolicy(ctx context.Context, in *magistrala.DeletePolicyReq, opts ...grpc.CallOption) (*magistrala.DeletePolicyRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.deletePolicy(ctx, policyReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -411,8 +411,8 @@ func encodeDeletePolicyRequest(_ context.Context, grpcReq interface{}) (interfac
|
||||
}
|
||||
|
||||
func (client grpcClient) DeletePolicies(ctx context.Context, in *magistrala.DeletePoliciesReq, opts ...grpc.CallOption) (*magistrala.DeletePoliciesRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
r := policiesReq{}
|
||||
|
||||
if in.GetDeletePoliciesReq() != nil {
|
||||
@@ -466,8 +466,8 @@ func encodeDeletePoliciesRequest(_ context.Context, grpcReq interface{}) (interf
|
||||
}
|
||||
|
||||
func (client grpcClient) ListObjects(ctx context.Context, in *magistrala.ListObjectsReq, opts ...grpc.CallOption) (*magistrala.ListObjectsRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.listObjects(ctx, listObjectsReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -505,8 +505,8 @@ func encodeListObjectsRequest(_ context.Context, grpcReq interface{}) (interface
|
||||
}
|
||||
|
||||
func (client grpcClient) ListAllObjects(ctx context.Context, in *magistrala.ListObjectsReq, opts ...grpc.CallOption) (*magistrala.ListObjectsRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.listAllObjects(ctx, listObjectsReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -526,8 +526,8 @@ func (client grpcClient) ListAllObjects(ctx context.Context, in *magistrala.List
|
||||
}
|
||||
|
||||
func (client grpcClient) CountObjects(ctx context.Context, in *magistrala.CountObjectsReq, opts ...grpc.CallOption) (*magistrala.CountObjectsRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.countObjects(ctx, listObjectsReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -565,8 +565,8 @@ func encodeCountObjectsRequest(_ context.Context, grpcReq interface{}) (interfac
|
||||
}
|
||||
|
||||
func (client grpcClient) ListSubjects(ctx context.Context, in *magistrala.ListSubjectsReq, opts ...grpc.CallOption) (*magistrala.ListSubjectsRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.listSubjects(ctx, listSubjectsReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -605,8 +605,8 @@ func encodeListSubjectsRequest(_ context.Context, grpcReq interface{}) (interfac
|
||||
}
|
||||
|
||||
func (client grpcClient) ListAllSubjects(ctx context.Context, in *magistrala.ListSubjectsReq, opts ...grpc.CallOption) (*magistrala.ListSubjectsRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.listAllSubjects(ctx, listSubjectsReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -626,8 +626,8 @@ func (client grpcClient) ListAllSubjects(ctx context.Context, in *magistrala.Lis
|
||||
}
|
||||
|
||||
func (client grpcClient) CountSubjects(ctx context.Context, in *magistrala.CountSubjectsReq, opts ...grpc.CallOption) (*magistrala.CountSubjectsRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.countSubjects(ctx, countSubjectsReq{
|
||||
Domain: in.GetDomain(),
|
||||
@@ -665,8 +665,8 @@ func encodeCountSubjectsRequest(_ context.Context, grpcReq interface{}) (interfa
|
||||
}
|
||||
|
||||
func (client grpcClient) ListPermissions(ctx context.Context, in *magistrala.ListPermissionsReq, opts ...grpc.CallOption) (*magistrala.ListPermissionsRes, error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.listPermissions(ctx, listPermissionsReq{
|
||||
Domain: in.GetDomain(),
|
||||
|
||||
@@ -7,13 +7,13 @@ import (
|
||||
"github.com/absmach/magistrala/auth"
|
||||
"github.com/absmach/magistrala/internal/api"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/go-chi/chi/v5"
|
||||
kithttp "github.com/go-kit/kit/transport/http"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
)
|
||||
|
||||
func MakeHandler(svc auth.Service, mux *chi.Mux, logger logger.Logger) *chi.Mux {
|
||||
func MakeHandler(svc auth.Service, mux *chi.Mux, logger mglog.Logger) *chi.Mux {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)),
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"github.com/absmach/magistrala/auth/jwt"
|
||||
"github.com/absmach/magistrala/auth/mocks"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@@ -76,7 +76,7 @@ func newService() auth.Service {
|
||||
}
|
||||
|
||||
func newServer(svc auth.Service) *httptest.Server {
|
||||
logger := logger.NewMock()
|
||||
logger := mglog.NewMock()
|
||||
mux := httpapi.MakeHandler(svc, logger, "")
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/auth"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/go-chi/chi/v5"
|
||||
kithttp "github.com/go-kit/kit/transport/http"
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
const contentType = "application/json"
|
||||
|
||||
// MakeHandler returns a HTTP handler for API endpoints.
|
||||
func MakeHandler(svc auth.Service, mux *chi.Mux, logger logger.Logger) *chi.Mux {
|
||||
func MakeHandler(svc auth.Service, mux *chi.Mux, logger mglog.Logger) *chi.Mux {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, encodeError)),
|
||||
}
|
||||
|
||||
@@ -9,13 +9,13 @@ import (
|
||||
"github.com/absmach/magistrala/auth"
|
||||
"github.com/absmach/magistrala/auth/api/http/domains"
|
||||
"github.com/absmach/magistrala/auth/api/http/keys"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
// MakeHandler returns a HTTP handler for API endpoints.
|
||||
func MakeHandler(svc auth.Service, logger logger.Logger, instanceID string) http.Handler {
|
||||
func MakeHandler(svc auth.Service, logger mglog.Logger, instanceID string) http.Handler {
|
||||
mux := chi.NewRouter()
|
||||
|
||||
mux = keys.MakeHandler(svc, mux, logger)
|
||||
|
||||
+9
-9
@@ -11,18 +11,18 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/auth"
|
||||
log "github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
)
|
||||
|
||||
var _ auth.Service = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger log.Logger
|
||||
logger mglog.Logger
|
||||
svc auth.Service
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the core service.
|
||||
func LoggingMiddleware(svc auth.Service, logger log.Logger) auth.Service {
|
||||
func LoggingMiddleware(svc auth.Service, logger mglog.Logger) auth.Service {
|
||||
return &loggingMiddleware{logger, svc}
|
||||
}
|
||||
|
||||
@@ -244,7 +244,7 @@ func (lm *loggingMiddleware) CreateDomain(ctx context.Context, token string, d a
|
||||
return lm.svc.CreateDomain(ctx, token, d)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) RetrieveDomain(ctx context.Context, token string, id string) (do auth.Domain, err error) {
|
||||
func (lm *loggingMiddleware) RetrieveDomain(ctx context.Context, token, id string) (do auth.Domain, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method retrieve_domain for domain id %s took %s to complete", id, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -256,7 +256,7 @@ func (lm *loggingMiddleware) RetrieveDomain(ctx context.Context, token string, i
|
||||
return lm.svc.RetrieveDomain(ctx, token, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) UpdateDomain(ctx context.Context, token string, id string, d auth.DomainReq) (do auth.Domain, err error) {
|
||||
func (lm *loggingMiddleware) UpdateDomain(ctx context.Context, token, id string, d auth.DomainReq) (do auth.Domain, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method update_domain for domain id %s took %s to complete", id, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -268,7 +268,7 @@ func (lm *loggingMiddleware) UpdateDomain(ctx context.Context, token string, id
|
||||
return lm.svc.UpdateDomain(ctx, token, id, d)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ChangeDomainStatus(ctx context.Context, token string, id string, d auth.DomainReq) (do auth.Domain, err error) {
|
||||
func (lm *loggingMiddleware) ChangeDomainStatus(ctx context.Context, token, id string, d auth.DomainReq) (do auth.Domain, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method change_domain_status for domain id %s took %s to complete", id, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -292,7 +292,7 @@ func (lm *loggingMiddleware) ListDomains(ctx context.Context, token string, page
|
||||
return lm.svc.ListDomains(ctx, token, page)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) AssignUsers(ctx context.Context, token string, id string, userIds []string, relation string) (err error) {
|
||||
func (lm *loggingMiddleware) AssignUsers(ctx context.Context, token, id string, userIds []string, relation string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method assign_users took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -304,7 +304,7 @@ func (lm *loggingMiddleware) AssignUsers(ctx context.Context, token string, id s
|
||||
return lm.svc.AssignUsers(ctx, token, id, userIds, relation)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) UnassignUsers(ctx context.Context, token string, id string, userIds []string, relation string) (err error) {
|
||||
func (lm *loggingMiddleware) UnassignUsers(ctx context.Context, token, id string, userIds []string, relation string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method unassign_users took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -316,7 +316,7 @@ func (lm *loggingMiddleware) UnassignUsers(ctx context.Context, token string, id
|
||||
return lm.svc.UnassignUsers(ctx, token, id, userIds, relation)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ListUserDomains(ctx context.Context, token string, userID string, page auth.Page) (do auth.DomainsPage, err error) {
|
||||
func (lm *loggingMiddleware) ListUserDomains(ctx context.Context, token, userID string, page auth.Page) (do auth.DomainsPage, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method list_user_domains took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
|
||||
+6
-6
@@ -176,7 +176,7 @@ func (ms *metricsMiddleware) CreateDomain(ctx context.Context, token string, d a
|
||||
return ms.svc.CreateDomain(ctx, token, d)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) RetrieveDomain(ctx context.Context, token string, id string) (auth.Domain, error) {
|
||||
func (ms *metricsMiddleware) RetrieveDomain(ctx context.Context, token, id string) (auth.Domain, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "retrieve_domain").Add(1)
|
||||
ms.latency.With("method", "retrieve_domain").Observe(time.Since(begin).Seconds())
|
||||
@@ -184,7 +184,7 @@ func (ms *metricsMiddleware) RetrieveDomain(ctx context.Context, token string, i
|
||||
return ms.svc.RetrieveDomain(ctx, token, id)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) UpdateDomain(ctx context.Context, token string, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
func (ms *metricsMiddleware) UpdateDomain(ctx context.Context, token, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "update_domain").Add(1)
|
||||
ms.latency.With("method", "update_domain").Observe(time.Since(begin).Seconds())
|
||||
@@ -192,7 +192,7 @@ func (ms *metricsMiddleware) UpdateDomain(ctx context.Context, token string, id
|
||||
return ms.svc.UpdateDomain(ctx, token, id, d)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) ChangeDomainStatus(ctx context.Context, token string, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
func (ms *metricsMiddleware) ChangeDomainStatus(ctx context.Context, token, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "change_domain_status").Add(1)
|
||||
ms.latency.With("method", "change_domain_status").Observe(time.Since(begin).Seconds())
|
||||
@@ -208,7 +208,7 @@ func (ms *metricsMiddleware) ListDomains(ctx context.Context, token string, page
|
||||
return ms.svc.ListDomains(ctx, token, page)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) AssignUsers(ctx context.Context, token string, id string, userIds []string, relation string) error {
|
||||
func (ms *metricsMiddleware) AssignUsers(ctx context.Context, token, id string, userIds []string, relation string) error {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "assign_users").Add(1)
|
||||
ms.latency.With("method", "assign_users").Observe(time.Since(begin).Seconds())
|
||||
@@ -216,7 +216,7 @@ func (ms *metricsMiddleware) AssignUsers(ctx context.Context, token string, id s
|
||||
return ms.svc.AssignUsers(ctx, token, id, userIds, relation)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) UnassignUsers(ctx context.Context, token string, id string, userIds []string, relation string) error {
|
||||
func (ms *metricsMiddleware) UnassignUsers(ctx context.Context, token, id string, userIds []string, relation string) error {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "unassign_users").Add(1)
|
||||
ms.latency.With("method", "unassign_users").Observe(time.Since(begin).Seconds())
|
||||
@@ -224,7 +224,7 @@ func (ms *metricsMiddleware) UnassignUsers(ctx context.Context, token string, id
|
||||
return ms.svc.UnassignUsers(ctx, token, id, userIds, relation)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) ListUserDomains(ctx context.Context, token string, userID string, page auth.Page) (auth.DomainsPage, error) {
|
||||
func (ms *metricsMiddleware) ListUserDomains(ctx context.Context, token, userID string, page auth.Page) (auth.DomainsPage, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "list_user_domains").Add(1)
|
||||
ms.latency.With("method", "list_user_domains").Observe(time.Since(begin).Seconds())
|
||||
|
||||
@@ -39,7 +39,7 @@ func (m *DomainsRepo) ListDomains(ctx context.Context, pm auth.Page) (auth.Domai
|
||||
return ret.Get(0).(auth.DomainsPage), ret.Error(1)
|
||||
}
|
||||
|
||||
func (m *DomainsRepo) Update(ctx context.Context, id string, userID string, d auth.DomainReq) (auth.Domain, error) {
|
||||
func (m *DomainsRepo) Update(ctx context.Context, id, userID string, d auth.DomainReq) (auth.Domain, error) {
|
||||
ret := m.Called(ctx, d, id, userID)
|
||||
|
||||
return ret.Get(0).(auth.Domain), ret.Error(1)
|
||||
|
||||
@@ -204,7 +204,7 @@ func (repo domainRepo) ListDomains(ctx context.Context, pm auth.Page) (auth.Doma
|
||||
}
|
||||
|
||||
// Update updates the client name and metadata.
|
||||
func (repo domainRepo) Update(ctx context.Context, id string, userID string, dr auth.DomainReq) (auth.Domain, error) {
|
||||
func (repo domainRepo) Update(ctx context.Context, id, userID string, dr auth.DomainReq) (auth.Domain, error) {
|
||||
var query []string
|
||||
var upq string
|
||||
var ws string = "AND status = :status"
|
||||
|
||||
+7
-7
@@ -504,7 +504,7 @@ func (svc service) CreateDomain(ctx context.Context, token string, d Domain) (do
|
||||
return svc.domains.Save(ctx, d)
|
||||
}
|
||||
|
||||
func (svc service) RetrieveDomain(ctx context.Context, token string, id string) (Domain, error) {
|
||||
func (svc service) RetrieveDomain(ctx context.Context, token, id string) (Domain, error) {
|
||||
if err := svc.Authorize(ctx, PolicyReq{
|
||||
Subject: token,
|
||||
SubjectType: UserType,
|
||||
@@ -519,7 +519,7 @@ func (svc service) RetrieveDomain(ctx context.Context, token string, id string)
|
||||
return svc.domains.RetrieveByID(ctx, id)
|
||||
}
|
||||
|
||||
func (svc service) UpdateDomain(ctx context.Context, token string, id string, d DomainReq) (Domain, error) {
|
||||
func (svc service) UpdateDomain(ctx context.Context, token, id string, d DomainReq) (Domain, error) {
|
||||
key, err := svc.Identify(ctx, token)
|
||||
if err != nil {
|
||||
return Domain{}, errors.Wrap(svcerr.ErrAuthentication, err)
|
||||
@@ -537,7 +537,7 @@ func (svc service) UpdateDomain(ctx context.Context, token string, id string, d
|
||||
return svc.domains.Update(ctx, id, key.User, d)
|
||||
}
|
||||
|
||||
func (svc service) ChangeDomainStatus(ctx context.Context, token string, id string, d DomainReq) (Domain, error) {
|
||||
func (svc service) ChangeDomainStatus(ctx context.Context, token, id string, d DomainReq) (Domain, error) {
|
||||
key, err := svc.Identify(ctx, token)
|
||||
if err != nil {
|
||||
return Domain{}, errors.Wrap(svcerr.ErrAuthentication, err)
|
||||
@@ -582,7 +582,7 @@ func (svc service) ListDomains(ctx context.Context, token string, p Page) (Domai
|
||||
return dp, nil
|
||||
}
|
||||
|
||||
func (svc service) AssignUsers(ctx context.Context, token string, id string, userIds []string, relation string) error {
|
||||
func (svc service) AssignUsers(ctx context.Context, token, id string, userIds []string, relation string) error {
|
||||
if err := svc.Authorize(ctx, PolicyReq{
|
||||
Subject: token,
|
||||
SubjectType: UserType,
|
||||
@@ -620,7 +620,7 @@ func (svc service) AssignUsers(ctx context.Context, token string, id string, use
|
||||
return svc.addDomainPolicies(ctx, id, relation, userIds...)
|
||||
}
|
||||
|
||||
func (svc service) UnassignUsers(ctx context.Context, token string, id string, userIds []string, relation string) error {
|
||||
func (svc service) UnassignUsers(ctx context.Context, token, id string, userIds []string, relation string) error {
|
||||
if err := svc.Authorize(ctx, PolicyReq{
|
||||
Subject: token,
|
||||
SubjectType: UserType,
|
||||
@@ -647,7 +647,7 @@ func (svc service) UnassignUsers(ctx context.Context, token string, id string, u
|
||||
}
|
||||
|
||||
// IMPROVEMENT NOTE: Take decision: Only Patform admin or both Patform and domain admins can see others users domain.
|
||||
func (svc service) ListUserDomains(ctx context.Context, token string, userID string, p Page) (DomainsPage, error) {
|
||||
func (svc service) ListUserDomains(ctx context.Context, token, userID string, p Page) (DomainsPage, error) {
|
||||
res, err := svc.Identify(ctx, token)
|
||||
if err != nil {
|
||||
return DomainsPage{}, errors.Wrap(svcerr.ErrAuthentication, err)
|
||||
@@ -803,7 +803,7 @@ func (svc service) removeDomainPolicies(ctx context.Context, domainID, relation
|
||||
return svc.domains.DeletePolicies(ctx, pcs...)
|
||||
}
|
||||
|
||||
func EncodeDomainUserID(domainID string, userID string) string {
|
||||
func EncodeDomainUserID(domainID, userID string) string {
|
||||
if domainID == "" || userID == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -239,7 +239,7 @@ func (tm *tracingMiddleware) CreateDomain(ctx context.Context, token string, d a
|
||||
return tm.svc.CreateDomain(ctx, token, d)
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) RetrieveDomain(ctx context.Context, token string, id string) (auth.Domain, error) {
|
||||
func (tm *tracingMiddleware) RetrieveDomain(ctx context.Context, token, id string) (auth.Domain, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "view_domain", trace.WithAttributes(
|
||||
attribute.String("id", id),
|
||||
))
|
||||
@@ -247,7 +247,7 @@ func (tm *tracingMiddleware) RetrieveDomain(ctx context.Context, token string, i
|
||||
return tm.svc.RetrieveDomain(ctx, token, id)
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) UpdateDomain(ctx context.Context, token string, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
func (tm *tracingMiddleware) UpdateDomain(ctx context.Context, token, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "update_domain", trace.WithAttributes(
|
||||
attribute.String("id", id),
|
||||
))
|
||||
@@ -255,7 +255,7 @@ func (tm *tracingMiddleware) UpdateDomain(ctx context.Context, token string, id
|
||||
return tm.svc.UpdateDomain(ctx, token, id, d)
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) ChangeDomainStatus(ctx context.Context, token string, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
func (tm *tracingMiddleware) ChangeDomainStatus(ctx context.Context, token, id string, d auth.DomainReq) (auth.Domain, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "change_domain_status", trace.WithAttributes(
|
||||
attribute.String("id", id),
|
||||
))
|
||||
@@ -269,7 +269,7 @@ func (tm *tracingMiddleware) ListDomains(ctx context.Context, token string, p au
|
||||
return tm.svc.ListDomains(ctx, token, p)
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) AssignUsers(ctx context.Context, token string, id string, userIds []string, relation string) error {
|
||||
func (tm *tracingMiddleware) AssignUsers(ctx context.Context, token, id string, userIds []string, relation string) error {
|
||||
ctx, span := tm.tracer.Start(ctx, "assign_users", trace.WithAttributes(
|
||||
attribute.String("id", id),
|
||||
attribute.StringSlice("user_ids", userIds),
|
||||
@@ -279,7 +279,7 @@ func (tm *tracingMiddleware) AssignUsers(ctx context.Context, token string, id s
|
||||
return tm.svc.AssignUsers(ctx, token, id, userIds, relation)
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) UnassignUsers(ctx context.Context, token string, id string, userIds []string, relation string) error {
|
||||
func (tm *tracingMiddleware) UnassignUsers(ctx context.Context, token, id string, userIds []string, relation string) error {
|
||||
ctx, span := tm.tracer.Start(ctx, "unassign_users", trace.WithAttributes(
|
||||
attribute.String("id", id),
|
||||
attribute.StringSlice("user_ids", userIds),
|
||||
@@ -289,7 +289,7 @@ func (tm *tracingMiddleware) UnassignUsers(ctx context.Context, token string, id
|
||||
return tm.svc.UnassignUsers(ctx, token, id, userIds, relation)
|
||||
}
|
||||
|
||||
func (tm *tracingMiddleware) ListUserDomains(ctx context.Context, token string, userID string, p auth.Page) (auth.DomainsPage, error) {
|
||||
func (tm *tracingMiddleware) ListUserDomains(ctx context.Context, token, userID string, p auth.Page) (auth.DomainsPage, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "list_user_domains", trace.WithAttributes(
|
||||
attribute.String("user_id", userID),
|
||||
))
|
||||
|
||||
@@ -185,13 +185,13 @@ func dec(in []byte) ([]byte, error) {
|
||||
}
|
||||
|
||||
func newService(url string, auth magistrala.AuthServiceClient) bootstrap.Service {
|
||||
things := mocks.NewConfigsRepository()
|
||||
thingsRepo := mocks.NewConfigsRepository()
|
||||
config := mgsdk.Config{
|
||||
ThingsURL: url,
|
||||
}
|
||||
|
||||
sdk := mgsdk.NewSDK(config)
|
||||
return bootstrap.New(auth, things, sdk, encKey)
|
||||
return bootstrap.New(auth, thingsRepo, sdk, encKey)
|
||||
}
|
||||
|
||||
func newThingsService() (things.Service, mggroups.Service, *thmocks.Repository, *chmocks.Repository, *authmocks.Service) {
|
||||
|
||||
@@ -10,13 +10,13 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala/bootstrap/postgres"
|
||||
pgclient "github.com/absmach/magistrala/internal/clients/postgres"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/ory/dockertest/v3"
|
||||
)
|
||||
|
||||
var (
|
||||
testLog, _ = logger.New(os.Stdout, logger.Info.String())
|
||||
testLog, _ = mglog.New(os.Stdout, mglog.Info.String())
|
||||
db *sqlx.DB
|
||||
)
|
||||
|
||||
|
||||
@@ -67,13 +67,13 @@ var (
|
||||
)
|
||||
|
||||
func newService(url string, auth magistrala.AuthServiceClient) bootstrap.Service {
|
||||
things := mocks.NewConfigsRepository()
|
||||
thingsRepo := mocks.NewConfigsRepository()
|
||||
config := mgsdk.Config{
|
||||
ThingsURL: url,
|
||||
}
|
||||
|
||||
sdk := mgsdk.NewSDK(config)
|
||||
return bootstrap.New(auth, things, sdk, encKey)
|
||||
return bootstrap.New(auth, thingsRepo, sdk, encKey)
|
||||
}
|
||||
|
||||
func newThingsService() (things.Service, mggroups.Service, *thmocks.Repository, *chmocks.Repository, *authmocks.Service) {
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/certs"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/go-chi/chi/v5"
|
||||
kithttp "github.com/go-kit/kit/transport/http"
|
||||
@@ -28,7 +28,7 @@ const (
|
||||
)
|
||||
|
||||
// MakeHandler returns a HTTP handler for API endpoints.
|
||||
func MakeHandler(svc certs.Service, logger logger.Logger, instanceID string) http.Handler {
|
||||
func MakeHandler(svc certs.Service, logger mglog.Logger, instanceID string) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, encodeError)),
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala/certs"
|
||||
"github.com/absmach/magistrala/internal/postgres"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/jackc/pgerrcode"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
@@ -29,12 +29,12 @@ type Cert struct {
|
||||
|
||||
type certsRepository struct {
|
||||
db postgres.Database
|
||||
log logger.Logger
|
||||
log mglog.Logger
|
||||
}
|
||||
|
||||
// NewRepository instantiates a PostgreSQL implementation of certs
|
||||
// repository.
|
||||
func NewRepository(db postgres.Database, log logger.Logger) certs.Repository {
|
||||
func NewRepository(db postgres.Database, log mglog.Logger) certs.Repository {
|
||||
return &certsRepository{db: db, log: log}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,13 +11,13 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala/certs/postgres"
|
||||
pgclient "github.com/absmach/magistrala/internal/clients/postgres"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/ory/dockertest/v3"
|
||||
)
|
||||
|
||||
var (
|
||||
testLog, _ = logger.New(os.Stdout, logger.Info.String())
|
||||
testLog, _ = mglog.New(os.Stdout, mglog.Info.String())
|
||||
db *sqlx.DB
|
||||
)
|
||||
|
||||
|
||||
+3
-3
@@ -56,12 +56,12 @@ type certsService struct {
|
||||
}
|
||||
|
||||
// New returns new Certs service.
|
||||
func New(auth magistrala.AuthServiceClient, certs Repository, sdk mgsdk.SDK, pki pki.Agent) Service {
|
||||
func New(auth magistrala.AuthServiceClient, certs Repository, sdk mgsdk.SDK, pkiAgent pki.Agent) Service {
|
||||
return &certsService{
|
||||
certsRepo: certs,
|
||||
sdk: sdk,
|
||||
auth: auth,
|
||||
pki: pki,
|
||||
pki: pkiAgent,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,7 +83,7 @@ type Cert struct {
|
||||
Expire time.Time `json:"expire" mapstructure:"-"`
|
||||
}
|
||||
|
||||
func (cs *certsService) IssueCert(ctx context.Context, token, thingID string, ttl string) (Cert, error) {
|
||||
func (cs *certsService) IssueCert(ctx context.Context, token, thingID, ttl string) (Cert, error) {
|
||||
owner, err := cs.auth.Identify(ctx, &magistrala.IdentityReq{Token: token})
|
||||
if err != nil {
|
||||
return Cert{}, errors.Wrap(svcerr.ErrAuthentication, err)
|
||||
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
"github.com/absmach/magistrala/certs"
|
||||
"github.com/absmach/magistrala/certs/mocks"
|
||||
chmocks "github.com/absmach/magistrala/internal/groups/mocks"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/clients"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
@@ -51,7 +51,7 @@ const (
|
||||
)
|
||||
|
||||
func newThingsServer(svc things.Service) *httptest.Server {
|
||||
logger := logger.NewMock()
|
||||
logger := mglog.NewMock()
|
||||
mux := chi.NewMux()
|
||||
httpapi.MakeHandler(svc, nil, mux, logger, instanceID)
|
||||
return httptest.NewServer(mux)
|
||||
|
||||
+1
-1
@@ -170,7 +170,7 @@ func NewConfigCmd() *cobra.Command {
|
||||
}
|
||||
}
|
||||
|
||||
func setConfigValue(key string, value string) error {
|
||||
func setConfigValue(key, value string) error {
|
||||
config, err := read(ConfigPath)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -166,7 +166,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(ctx context.Context, auth magistrala.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mglog.Logger, cfg config, dbConfig pgclient.Config) (bootstrap.Service, error) {
|
||||
func newService(ctx context.Context, authClient magistrala.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mglog.Logger, cfg config, dbConfig pgclient.Config) (bootstrap.Service, error) {
|
||||
database := postgres.NewDatabase(db, dbConfig, tracer)
|
||||
|
||||
repoConfig := bootstrappg.NewConfigRepository(database, logger)
|
||||
@@ -177,7 +177,7 @@ func newService(ctx context.Context, auth magistrala.AuthServiceClient, db *sqlx
|
||||
|
||||
sdk := mgsdk.NewSDK(config)
|
||||
|
||||
svc := bootstrap.New(auth, repoConfig, sdk, []byte(cfg.EncKey))
|
||||
svc := bootstrap.New(authClient, repoConfig, sdk, []byte(cfg.EncKey))
|
||||
|
||||
var err error
|
||||
svc, err = producer.NewEventStoreMiddleware(ctx, svc, cfg.ESURL)
|
||||
|
||||
+2
-2
@@ -170,14 +170,14 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(auth magistrala.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mglog.Logger, cfg config, dbConfig pgclient.Config, pkiAgent vault.Agent) certs.Service {
|
||||
func newService(authClient magistrala.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mglog.Logger, cfg config, dbConfig pgclient.Config, pkiAgent vault.Agent) certs.Service {
|
||||
database := postgres.NewDatabase(db, dbConfig, tracer)
|
||||
certsRepo := certspg.NewRepository(database, logger)
|
||||
config := mgsdk.Config{
|
||||
ThingsURL: cfg.ThingsURL,
|
||||
}
|
||||
sdk := mgsdk.NewSDK(config)
|
||||
svc := certs.New(auth, certsRepo, sdk, pkiAgent)
|
||||
svc := certs.New(authClient, certsRepo, sdk, pkiAgent)
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics(svcName, "api")
|
||||
svc = api.MetricsMiddleware(svc, counter, latency)
|
||||
|
||||
+2
-2
@@ -159,10 +159,10 @@ func newService(pub messaging.Publisher, tc magistrala.AuthzServiceClient, logge
|
||||
return svc
|
||||
}
|
||||
|
||||
func proxyHTTP(ctx context.Context, cfg server.Config, logger mglog.Logger, handler session.Handler) error {
|
||||
func proxyHTTP(ctx context.Context, cfg server.Config, logger mglog.Logger, sessionHandler session.Handler) error {
|
||||
address := fmt.Sprintf("%s:%s", "", cfg.Port)
|
||||
target := fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort)
|
||||
mp, err := mproxy.NewProxy(address, target, handler, logger)
|
||||
mp, err := mproxy.NewProxy(address, target, sessionHandler, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
+4
-4
@@ -166,9 +166,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func connectToMQTTBroker(url, user, password string, timeout time.Duration, logger mglog.Logger) (mqttpaho.Client, error) {
|
||||
func connectToMQTTBroker(burl, user, password string, timeout time.Duration, logger mglog.Logger) (mqttpaho.Client, error) {
|
||||
opts := mqttpaho.NewClientOptions()
|
||||
opts.AddBroker(url)
|
||||
opts.AddBroker(burl)
|
||||
opts.SetUsername(user)
|
||||
opts.SetPassword(password)
|
||||
opts.SetOnConnectHandler(func(_ mqttpaho.Client) {
|
||||
@@ -188,9 +188,9 @@ func connectToMQTTBroker(url, user, password string, timeout time.Duration, logg
|
||||
}
|
||||
|
||||
func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Duration, topic string, logger mglog.Logger) error {
|
||||
mqtt := mqtt.NewBroker(svc, mc, timeout, logger)
|
||||
mqttBroker := mqtt.NewBroker(svc, mc, timeout, logger)
|
||||
logger.Info("Subscribed to Lora MQTT broker")
|
||||
if err := mqtt.Subscribe(topic); err != nil {
|
||||
if err := mqttBroker.Subscribe(topic); err != nil {
|
||||
return fmt.Errorf("failed to subscribe to Lora MQTT broker: %s", err)
|
||||
}
|
||||
return nil
|
||||
|
||||
+5
-5
@@ -208,14 +208,14 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func proxyMQTT(ctx context.Context, cfg config, logger mglog.Logger, handler session.Handler) error {
|
||||
func proxyMQTT(ctx context.Context, cfg config, logger mglog.Logger, sessionHandler session.Handler) error {
|
||||
address := fmt.Sprintf(":%s", cfg.MQTTPort)
|
||||
target := fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort)
|
||||
mp := mp.New(address, target, handler, logger)
|
||||
mproxy := mp.New(address, target, sessionHandler, logger)
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- mp.Listen(ctx)
|
||||
errCh <- mproxy.Listen(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -227,9 +227,9 @@ func proxyMQTT(ctx context.Context, cfg config, logger mglog.Logger, handler ses
|
||||
}
|
||||
}
|
||||
|
||||
func proxyWS(ctx context.Context, cfg config, logger mglog.Logger, handler session.Handler) error {
|
||||
func proxyWS(ctx context.Context, cfg config, logger mglog.Logger, sessionHandler session.Handler) error {
|
||||
target := fmt.Sprintf("%s:%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort)
|
||||
wp := websocket.New(target, cfg.HTTPTargetPath, "ws", handler, logger)
|
||||
wp := websocket.New(target, cfg.HTTPTargetPath, "ws", sessionHandler, logger)
|
||||
http.Handle("/mqtt", wp.Handler())
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
@@ -172,12 +172,12 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(db *sqlx.DB, tracer trace.Tracer, auth magistrala.AuthServiceClient, c config, sc mgsmpp.Config, logger mglog.Logger) notifiers.Service {
|
||||
func newService(db *sqlx.DB, tracer trace.Tracer, authClient magistrala.AuthServiceClient, c config, sc mgsmpp.Config, logger mglog.Logger) notifiers.Service {
|
||||
database := notifierpg.NewDatabase(db, tracer)
|
||||
repo := tracing.New(tracer, notifierpg.New(database))
|
||||
idp := ulid.New()
|
||||
notifier := mgsmpp.New(sc)
|
||||
svc := notifiers.New(auth, repo, idp, notifier, c.From)
|
||||
svc := notifiers.New(authClient, repo, idp, notifier, c.From)
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics("notifier", "smpp")
|
||||
svc = api.MetricsMiddleware(svc, counter, latency)
|
||||
|
||||
@@ -182,7 +182,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(db *sqlx.DB, tracer trace.Tracer, auth magistrala.AuthServiceClient, c config, ec email.Config, logger mglog.Logger) (notifiers.Service, error) {
|
||||
func newService(db *sqlx.DB, tracer trace.Tracer, authClient magistrala.AuthServiceClient, c config, ec email.Config, logger mglog.Logger) (notifiers.Service, error) {
|
||||
database := notifierpg.NewDatabase(db, tracer)
|
||||
repo := tracing.New(tracer, notifierpg.New(database))
|
||||
idp := ulid.New()
|
||||
@@ -193,7 +193,7 @@ func newService(db *sqlx.DB, tracer trace.Tracer, auth magistrala.AuthServiceCli
|
||||
}
|
||||
|
||||
notifier := smtp.New(agent)
|
||||
svc := notifiers.New(auth, repo, idp, notifier, c.From)
|
||||
svc := notifiers.New(authClient, repo, idp, notifier, c.From)
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics("notifier", "smtp")
|
||||
svc = api.MetricsMiddleware(svc, counter, latency)
|
||||
|
||||
+5
-5
@@ -42,7 +42,7 @@ import (
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/jmoiron/sqlx"
|
||||
callhome "github.com/mainflux/callhome/pkg/client"
|
||||
chclient "github.com/mainflux/callhome/pkg/client"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
@@ -193,7 +193,7 @@ func main() {
|
||||
gs := grpcserver.New(ctx, cancel, svcName, grpcServerConfig, regiterAuthzServer, logger)
|
||||
|
||||
if cfg.SendTelemetry {
|
||||
chc := callhome.New(svcName, magistrala.Version, logger, cancel)
|
||||
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
|
||||
go chc.CallHome(ctx)
|
||||
}
|
||||
|
||||
@@ -215,7 +215,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, auth magistrala.AuthServiceClient, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger mglog.Logger) (things.Service, groups.Service, error) {
|
||||
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authClient magistrala.AuthServiceClient, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger mglog.Logger) (things.Service, groups.Service, error) {
|
||||
database := postgres.NewDatabase(db, dbConfig, tracer)
|
||||
cRepo := thingspg.NewRepository(database)
|
||||
gRepo := gpostgres.New(database)
|
||||
@@ -224,8 +224,8 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, auth
|
||||
|
||||
thingCache := thcache.NewCache(cacheClient, keyDuration)
|
||||
|
||||
csvc := things.NewService(auth, cRepo, gRepo, thingCache, idp)
|
||||
gsvc := mggroups.NewService(gRepo, idp, auth)
|
||||
csvc := things.NewService(authClient, cRepo, gRepo, thingCache, idp)
|
||||
gsvc := mggroups.NewService(gRepo, idp, authClient)
|
||||
|
||||
csvc, err := thevents.NewEventStoreMiddleware(ctx, csvc, esURL)
|
||||
if err != nil {
|
||||
|
||||
+8
-8
@@ -190,7 +190,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(ctx context.Context, auth magistrala.AuthServiceClient, db *sqlx.DB, dbConfig pgclient.Config, tracer trace.Tracer, c config, ec email.Config, logger mglog.Logger) (users.Service, groups.Service, error) {
|
||||
func newService(ctx context.Context, authClient magistrala.AuthServiceClient, db *sqlx.DB, dbConfig pgclient.Config, tracer trace.Tracer, c config, ec email.Config, logger mglog.Logger) (users.Service, groups.Service, error) {
|
||||
database := postgres.NewDatabase(db, dbConfig, tracer)
|
||||
cRepo := clientspg.NewRepository(database)
|
||||
gRepo := gpostgres.New(database)
|
||||
@@ -198,13 +198,13 @@ func newService(ctx context.Context, auth magistrala.AuthServiceClient, db *sqlx
|
||||
idp := uuid.New()
|
||||
hsr := hasher.New()
|
||||
|
||||
emailer, err := emailer.New(c.ResetURL, &ec)
|
||||
emailerClient, err := emailer.New(c.ResetURL, &ec)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to configure e-mailing util: %s", err.Error()))
|
||||
}
|
||||
|
||||
csvc := users.NewService(cRepo, auth, emailer, hsr, idp, c.PassRegex, c.SelfRegister)
|
||||
gsvc := mggroups.NewService(gRepo, idp, auth)
|
||||
csvc := users.NewService(cRepo, authClient, emailerClient, hsr, idp, c.PassRegex, c.SelfRegister)
|
||||
gsvc := mggroups.NewService(gRepo, idp, authClient)
|
||||
|
||||
csvc, err = uevents.NewEventStoreMiddleware(ctx, csvc, c.ESURL)
|
||||
if err != nil {
|
||||
@@ -229,7 +229,7 @@ func newService(ctx context.Context, auth magistrala.AuthServiceClient, db *sqlx
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create admin client: %s", err))
|
||||
}
|
||||
if err := createAdminPolicy(ctx, clientID, auth); err != nil {
|
||||
if err := createAdminPolicy(ctx, clientID, authClient); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return csvc, gsvc, err
|
||||
@@ -275,8 +275,8 @@ func createAdmin(ctx context.Context, c config, crepo clientspg.Repository, hsr
|
||||
return client.ID, nil
|
||||
}
|
||||
|
||||
func createAdminPolicy(ctx context.Context, clientID string, auth magistrala.AuthServiceClient) error {
|
||||
res, err := auth.Authorize(ctx, &magistrala.AuthorizeReq{
|
||||
func createAdminPolicy(ctx context.Context, clientID string, authClient magistrala.AuthServiceClient) error {
|
||||
res, err := authClient.Authorize(ctx, &magistrala.AuthorizeReq{
|
||||
SubjectType: authSvc.UserType,
|
||||
Subject: clientID,
|
||||
Permission: authSvc.AdministratorRelation,
|
||||
@@ -284,7 +284,7 @@ func createAdminPolicy(ctx context.Context, clientID string, auth magistrala.Aut
|
||||
ObjectType: authSvc.PlatformType,
|
||||
})
|
||||
if err != nil || !res.Authorized {
|
||||
addPolicyRes, err := auth.AddPolicy(ctx, &magistrala.AddPolicyReq{
|
||||
addPolicyRes, err := authClient.AddPolicy(ctx, &magistrala.AddPolicyReq{
|
||||
SubjectType: authSvc.UserType,
|
||||
Subject: clientID,
|
||||
Relation: authSvc.AdministratorRelation,
|
||||
|
||||
+1
-1
@@ -162,7 +162,7 @@ func newService(tc magistrala.AuthzServiceClient, nps messaging.PubSub, logger m
|
||||
return svc
|
||||
}
|
||||
|
||||
func proxyWS(ctx context.Context, hostConfig server.Config, targetConfig server.Config, logger mglog.Logger, handler session.Handler) error {
|
||||
func proxyWS(ctx context.Context, hostConfig, targetConfig server.Config, logger mglog.Logger, handler session.Handler) error {
|
||||
target := fmt.Sprintf("ws://%s:%s", targetConfig.Host, targetConfig.Port)
|
||||
address := fmt.Sprintf("%s:%s", hostConfig.Host, hostConfig.Port)
|
||||
wp, err := websockets.NewProxy(address, target, logger, handler)
|
||||
|
||||
+3
-3
@@ -9,7 +9,7 @@ import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/plgd-dev/go-coap/v2/message"
|
||||
@@ -39,11 +39,11 @@ type client struct {
|
||||
client mux.Client
|
||||
token message.Token
|
||||
observe uint32
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
}
|
||||
|
||||
// NewClient instantiates a new Observer.
|
||||
func NewClient(c mux.Client, tkn message.Token, l logger.Logger) Client {
|
||||
func NewClient(c mux.Client, tkn message.Token, l mglog.Logger) Client {
|
||||
return &client{
|
||||
client: c,
|
||||
token: tkn,
|
||||
|
||||
@@ -43,7 +43,7 @@ func (tm *tracingServiceMiddleware) Publish(ctx context.Context, key string, msg
|
||||
}
|
||||
|
||||
// Subscribe traces a CoAP subscribe operation.
|
||||
func (tm *tracingServiceMiddleware) Subscribe(ctx context.Context, key string, chanID string, subtopic string, c coap.Client) error {
|
||||
func (tm *tracingServiceMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, c coap.Client) error {
|
||||
ctx, span := tm.tracer.Start(ctx, subscribeOP, trace.WithAttributes(
|
||||
attribute.String("channel_id", chanID),
|
||||
attribute.String("subtopic", subtopic),
|
||||
@@ -53,7 +53,7 @@ func (tm *tracingServiceMiddleware) Subscribe(ctx context.Context, key string, c
|
||||
}
|
||||
|
||||
// Unsubscribe traces a CoAP unsubscribe operation.
|
||||
func (tm *tracingServiceMiddleware) Unsubscribe(ctx context.Context, key string, chanID string, subptopic string, token string) error {
|
||||
func (tm *tracingServiceMiddleware) Unsubscribe(ctx context.Context, key, chanID, subptopic, token string) error {
|
||||
ctx, span := tm.tracer.Start(ctx, unsubscribeOP, trace.WithAttributes(
|
||||
attribute.String("channel_id", chanID),
|
||||
attribute.String("subtopic", subptopic),
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/absmach/magistrala/pkg/messaging/brokers"
|
||||
@@ -33,7 +33,7 @@ var (
|
||||
// Start method starts consuming messages received from Message broker.
|
||||
// This method transforms messages to SenML format before
|
||||
// using MessageRepository to store them.
|
||||
func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error {
|
||||
func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger mglog.Logger) error {
|
||||
cfg, err := loadConfig(configPath)
|
||||
if err != nil {
|
||||
logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err))
|
||||
@@ -143,7 +143,7 @@ func loadConfig(configPath string) (config, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func makeTransformer(cfg transformerConfig, logger logger.Logger) transformers.Transformer {
|
||||
func makeTransformer(cfg transformerConfig, logger mglog.Logger) transformers.Transformer {
|
||||
switch strings.ToUpper(cfg.Format) {
|
||||
case "SENML":
|
||||
logger.Info("Using SenML transformer")
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
httpapi "github.com/absmach/magistrala/consumers/notifiers/api"
|
||||
"github.com/absmach/magistrala/consumers/notifiers/mocks"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -77,7 +77,7 @@ func newService() (notifiers.Service, *authmocks.Service) {
|
||||
}
|
||||
|
||||
func newServer(svc notifiers.Service) *httptest.Server {
|
||||
logger := logger.NewMock()
|
||||
logger := mglog.NewMock()
|
||||
mux := httpapi.MakeHandler(svc, logger, instanceID)
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/consumers/notifiers"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -32,7 +32,7 @@ const (
|
||||
)
|
||||
|
||||
// MakeHandler returns a HTTP handler for API endpoints.
|
||||
func MakeHandler(svc notifiers.Service, logger logger.Logger, instanceID string) http.Handler {
|
||||
func MakeHandler(svc notifiers.Service, logger mglog.Logger, instanceID string) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, encodeError)),
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"github.com/absmach/magistrala/http/api"
|
||||
"github.com/absmach/magistrala/http/mocks"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
mproxy "github.com/mainflux/mproxy/pkg/http"
|
||||
"github.com/mainflux/mproxy/pkg/session"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -28,7 +28,7 @@ const instanceID = "5de9b29a-feb9-11ed-be56-0242ac120002"
|
||||
|
||||
func newService(auth magistrala.AuthzServiceClient) session.Handler {
|
||||
pub := mocks.NewPublisher()
|
||||
return server.NewHandler(pub, logger.NewMock(), auth)
|
||||
return server.NewHandler(pub, mglog.NewMock(), auth)
|
||||
}
|
||||
|
||||
func newTargetHTTPServer() *httptest.Server {
|
||||
@@ -37,7 +37,7 @@ func newTargetHTTPServer() *httptest.Server {
|
||||
}
|
||||
|
||||
func newProxyHTPPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) {
|
||||
mp, err := mproxy.NewProxy("", targetServer.URL, svc, logger.NewMock())
|
||||
mp, err := mproxy.NewProxy("", targetServer.URL, svc, mglog.NewMock())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
+3
-3
@@ -13,7 +13,7 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/mainflux/mproxy/pkg/session"
|
||||
@@ -51,11 +51,11 @@ var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]
|
||||
type handler struct {
|
||||
publisher messaging.Publisher
|
||||
auth magistrala.AuthzServiceClient
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
}
|
||||
|
||||
// NewHandler creates new Handler entity.
|
||||
func NewHandler(publisher messaging.Publisher, logger logger.Logger, auth magistrala.AuthzServiceClient) session.Handler {
|
||||
func NewHandler(publisher messaging.Publisher, logger mglog.Logger, auth magistrala.AuthzServiceClient) session.Handler {
|
||||
return &handler{
|
||||
logger: logger,
|
||||
publisher: publisher,
|
||||
|
||||
@@ -9,13 +9,13 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
kithttp "github.com/go-kit/kit/transport/http"
|
||||
)
|
||||
|
||||
// LoggingErrorEncoder is a go-kit error encoder logging decorator.
|
||||
func LoggingErrorEncoder(logger logger.Logger, enc kithttp.ErrorEncoder) kithttp.ErrorEncoder {
|
||||
func LoggingErrorEncoder(logger mglog.Logger, enc kithttp.ErrorEncoder) kithttp.ErrorEncoder {
|
||||
return func(ctx context.Context, err error, w http.ResponseWriter) {
|
||||
if errors.Contains(err, ErrValidation) {
|
||||
logger.Error(err.Error())
|
||||
@@ -45,7 +45,7 @@ func ReadUintQuery(r *http.Request, key string, def uint64) (uint64, error) {
|
||||
}
|
||||
|
||||
// ReadStringQuery reads the value of string http query parameters for a given key.
|
||||
func ReadStringQuery(r *http.Request, key string, def string) (string, error) {
|
||||
func ReadStringQuery(r *http.Request, key, def string) (string, error) {
|
||||
vals := r.URL.Query()[key]
|
||||
if len(vals) > 1 {
|
||||
return "", ErrInvalidQueryParams
|
||||
|
||||
@@ -32,7 +32,7 @@ func Setup(envPrefix string) (*gocql.Session, error) {
|
||||
// SetupDB load configuration from environment,
|
||||
// creates new cassandra connection and executes
|
||||
// the initial query in database.
|
||||
func SetupDB(envPrefix string, initQuery string) (*gocql.Session, error) {
|
||||
func SetupDB(envPrefix, initQuery string) (*gocql.Session, error) {
|
||||
cfg := Config{}
|
||||
if err := env.ParseWithOptions(&cfg, env.Options{Prefix: envPrefix}); err != nil {
|
||||
return nil, errors.Wrap(errConfig, err)
|
||||
|
||||
@@ -159,7 +159,7 @@ func ListMembersEndpoint(svc groups.Service, memberKind string) endpoint.Endpoin
|
||||
}
|
||||
}
|
||||
|
||||
func AssignMembersEndpoint(svc groups.Service, relation string, memberKind string) endpoint.Endpoint {
|
||||
func AssignMembersEndpoint(svc groups.Service, relation, memberKind string) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
req := request.(assignReq)
|
||||
if relation != "" {
|
||||
@@ -178,7 +178,7 @@ func AssignMembersEndpoint(svc groups.Service, relation string, memberKind strin
|
||||
}
|
||||
}
|
||||
|
||||
func UnassignMembersEndpoint(svc groups.Service, relation string, memberKind string) endpoint.Endpoint {
|
||||
func UnassignMembersEndpoint(svc groups.Service, relation, memberKind string) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
req := request.(unassignReq)
|
||||
if relation != "" {
|
||||
|
||||
@@ -33,11 +33,11 @@ type service struct {
|
||||
}
|
||||
|
||||
// NewService returns a new Clients service implementation.
|
||||
func NewService(g groups.Repository, idp magistrala.IDProvider, auth magistrala.AuthServiceClient) groups.Service {
|
||||
func NewService(g groups.Repository, idp magistrala.IDProvider, authClient magistrala.AuthServiceClient) groups.Service {
|
||||
return service{
|
||||
groups: g,
|
||||
idProvider: idp,
|
||||
auth: auth,
|
||||
auth: authClient,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +115,7 @@ func (svc service) ViewGroup(ctx context.Context, token, id string) (groups.Grou
|
||||
return svc.groups.RetrieveByID(ctx, id)
|
||||
}
|
||||
|
||||
func (svc service) ViewGroupPerms(ctx context.Context, token string, id string) ([]string, error) {
|
||||
func (svc service) ViewGroupPerms(ctx context.Context, token, id string) ([]string, error) {
|
||||
res, err := svc.identify(ctx, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -452,16 +452,16 @@ func (svc service) Assign(ctx context.Context, token, groupID, relation, memberK
|
||||
}
|
||||
|
||||
func (svc service) assignParentGroup(ctx context.Context, domain, parentGroupID string, groupIDs []string) (err error) {
|
||||
groups, err := svc.groups.RetrieveByIDs(ctx, groups.Page{PageMeta: groups.PageMeta{Limit: 1<<63 - 1}}, groupIDs...)
|
||||
groupsPage, err := svc.groups.RetrieveByIDs(ctx, groups.Page{PageMeta: groups.PageMeta{Limit: 1<<63 - 1}}, groupIDs...)
|
||||
if err != nil {
|
||||
return errors.Wrap(errRetrieveGroups, err)
|
||||
}
|
||||
if len(groups.Groups) == 0 {
|
||||
if len(groupsPage.Groups) == 0 {
|
||||
return errGroupIDs
|
||||
}
|
||||
var addPolicies magistrala.AddPoliciesReq
|
||||
var deletePolicies magistrala.DeletePoliciesReq
|
||||
for _, group := range groups.Groups {
|
||||
for _, group := range groupsPage.Groups {
|
||||
if group.Parent != "" {
|
||||
return fmt.Errorf("%s group already have parent", group.ID)
|
||||
}
|
||||
@@ -498,16 +498,16 @@ func (svc service) assignParentGroup(ctx context.Context, domain, parentGroupID
|
||||
}
|
||||
|
||||
func (svc service) unassignParentGroup(ctx context.Context, domain, parentGroupID string, groupIDs []string) error {
|
||||
groups, err := svc.groups.RetrieveByIDs(ctx, groups.Page{PageMeta: groups.PageMeta{Limit: 1<<63 - 1}}, groupIDs...)
|
||||
groupsPage, err := svc.groups.RetrieveByIDs(ctx, groups.Page{PageMeta: groups.PageMeta{Limit: 1<<63 - 1}}, groupIDs...)
|
||||
if err != nil {
|
||||
return errors.Wrap(errRetrieveGroups, err)
|
||||
}
|
||||
if len(groups.Groups) == 0 {
|
||||
if len(groupsPage.Groups) == 0 {
|
||||
return errGroupIDs
|
||||
}
|
||||
var addPolicies magistrala.AddPoliciesReq
|
||||
var deletePolicies magistrala.DeletePoliciesReq
|
||||
for _, group := range groups.Groups {
|
||||
for _, group := range groupsPage.Groups {
|
||||
if group.Parent != "" && group.Parent != parentGroupID {
|
||||
return fmt.Errorf("%s group doesn't have same parent", group.ID)
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/internal/server"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
gocoap "github.com/plgd-dev/go-coap/v2"
|
||||
"github.com/plgd-dev/go-coap/v2/mux"
|
||||
)
|
||||
@@ -26,7 +26,7 @@ type Server struct {
|
||||
|
||||
var _ server.Server = (*Server)(nil)
|
||||
|
||||
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, handler mux.HandlerFunc, logger logger.Logger) server.Server {
|
||||
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, handler mux.HandlerFunc, logger mglog.Logger) server.Server {
|
||||
listenFullAddress := fmt.Sprintf("%s:%s", config.Host, config.Port)
|
||||
return &Server{
|
||||
BaseServer: server.BaseServer{
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/internal/server"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
@@ -34,7 +34,7 @@ type serviceRegister func(srv *grpc.Server)
|
||||
|
||||
var _ server.Server = (*Server)(nil)
|
||||
|
||||
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, registerService serviceRegister, logger logger.Logger) server.Server {
|
||||
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, registerService serviceRegister, logger mglog.Logger) server.Server {
|
||||
listenFullAddress := fmt.Sprintf("%s:%s", config.Host, config.Port)
|
||||
return &Server{
|
||||
BaseServer: server.BaseServer{
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/internal/server"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -26,7 +26,7 @@ type Server struct {
|
||||
|
||||
var _ server.Server = (*Server)(nil)
|
||||
|
||||
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, handler http.Handler, logger logger.Logger) server.Server {
|
||||
func New(ctx context.Context, cancel context.CancelFunc, name string, config server.Config, handler http.Handler, logger mglog.Logger) server.Server {
|
||||
listenFullAddress := fmt.Sprintf("%s:%s", config.Host, config.Port)
|
||||
httpServer := &http.Server{Addr: listenFullAddress, Handler: handler}
|
||||
return &Server{
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
)
|
||||
|
||||
type Server interface {
|
||||
@@ -32,7 +32,7 @@ type BaseServer struct {
|
||||
Name string
|
||||
Address string
|
||||
Config Config
|
||||
Logger logger.Logger
|
||||
Logger mglog.Logger
|
||||
Protocol string
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ func stopAllServer(servers ...Server) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func StopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger logger.Logger, svcName string, servers ...Server) error {
|
||||
func StopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger mglog.Logger, svcName string, servers ...Server) error {
|
||||
var err error
|
||||
c := make(chan os.Signal, 2)
|
||||
signal.Notify(c, syscall.SIGINT, syscall.SIGABRT)
|
||||
|
||||
+8
-8
@@ -34,19 +34,19 @@ var (
|
||||
// implementation, and all of its decorators (e.g. logging & metrics).
|
||||
type Service interface {
|
||||
// CreateThing creates thingID:devEUI route-map
|
||||
CreateThing(ctx context.Context, thingID string, devEUI string) error
|
||||
CreateThing(ctx context.Context, thingID, devEUI string) error
|
||||
|
||||
// UpdateThing updates thingID:devEUI route-map
|
||||
UpdateThing(ctx context.Context, thingID string, devEUI string) error
|
||||
UpdateThing(ctx context.Context, thingID, devEUI string) error
|
||||
|
||||
// RemoveThing removes thingID:devEUI route-map
|
||||
RemoveThing(ctx context.Context, thingID string) error
|
||||
|
||||
// CreateChannel creates channelID:appID route-map
|
||||
CreateChannel(ctx context.Context, chanID string, appID string) error
|
||||
CreateChannel(ctx context.Context, chanID, appID string) error
|
||||
|
||||
// UpdateChannel updates channelID:appID route-map
|
||||
UpdateChannel(ctx context.Context, chanID string, appID string) error
|
||||
UpdateChannel(ctx context.Context, chanID, appID string) error
|
||||
|
||||
// RemoveChannel removes channelID:appID route-map
|
||||
RemoveChannel(ctx context.Context, chanID string) error
|
||||
@@ -128,11 +128,11 @@ func (as *adapterService) Publish(ctx context.Context, m *Message) error {
|
||||
return as.publisher.Publish(ctx, msg.Channel, &msg)
|
||||
}
|
||||
|
||||
func (as *adapterService) CreateThing(ctx context.Context, thingID string, devEUI string) error {
|
||||
func (as *adapterService) CreateThing(ctx context.Context, thingID, devEUI string) error {
|
||||
return as.thingsRM.Save(ctx, thingID, devEUI)
|
||||
}
|
||||
|
||||
func (as *adapterService) UpdateThing(ctx context.Context, thingID string, devEUI string) error {
|
||||
func (as *adapterService) UpdateThing(ctx context.Context, thingID, devEUI string) error {
|
||||
return as.thingsRM.Save(ctx, thingID, devEUI)
|
||||
}
|
||||
|
||||
@@ -140,11 +140,11 @@ func (as *adapterService) RemoveThing(ctx context.Context, thingID string) error
|
||||
return as.thingsRM.Remove(ctx, thingID)
|
||||
}
|
||||
|
||||
func (as *adapterService) CreateChannel(ctx context.Context, chanID string, appID string) error {
|
||||
func (as *adapterService) CreateChannel(ctx context.Context, chanID, appID string) error {
|
||||
return as.channelsRM.Save(ctx, chanID, appID)
|
||||
}
|
||||
|
||||
func (as *adapterService) UpdateChannel(ctx context.Context, chanID string, appID string) error {
|
||||
func (as *adapterService) UpdateChannel(ctx context.Context, chanID, appID string) error {
|
||||
return as.channelsRM.Save(ctx, chanID, appID)
|
||||
}
|
||||
|
||||
|
||||
+5
-5
@@ -10,26 +10,26 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/lora"
|
||||
)
|
||||
|
||||
var _ lora.Service = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
svc lora.Service
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the core service.
|
||||
func LoggingMiddleware(svc lora.Service, logger logger.Logger) lora.Service {
|
||||
func LoggingMiddleware(svc lora.Service, logger mglog.Logger) lora.Service {
|
||||
return &loggingMiddleware{
|
||||
logger: logger,
|
||||
svc: svc,
|
||||
}
|
||||
}
|
||||
|
||||
func (lm loggingMiddleware) CreateThing(ctx context.Context, thingID string, loraDevEUI string) (err error) {
|
||||
func (lm loggingMiddleware) CreateThing(ctx context.Context, thingID, loraDevEUI string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("create_thing for thing %s and lora-dev-eui %s took %s to complete", thingID, loraDevEUI, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -42,7 +42,7 @@ func (lm loggingMiddleware) CreateThing(ctx context.Context, thingID string, lor
|
||||
return lm.svc.CreateThing(ctx, thingID, loraDevEUI)
|
||||
}
|
||||
|
||||
func (lm loggingMiddleware) UpdateThing(ctx context.Context, thingID string, loraDevEUI string) (err error) {
|
||||
func (lm loggingMiddleware) UpdateThing(ctx context.Context, thingID, loraDevEUI string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("update_thing for thing %s and lora-dev-eui %s took %s to complete", thingID, loraDevEUI, time.Since(begin))
|
||||
if err != nil {
|
||||
|
||||
+2
-2
@@ -30,7 +30,7 @@ func MetricsMiddleware(svc lora.Service, counter metrics.Counter, latency metric
|
||||
}
|
||||
}
|
||||
|
||||
func (mm *metricsMiddleware) CreateThing(ctx context.Context, thingID string, loraDevEUI string) error {
|
||||
func (mm *metricsMiddleware) CreateThing(ctx context.Context, thingID, loraDevEUI string) error {
|
||||
defer func(begin time.Time) {
|
||||
mm.counter.With("method", "create_thing").Add(1)
|
||||
mm.latency.With("method", "create_thing").Observe(time.Since(begin).Seconds())
|
||||
@@ -39,7 +39,7 @@ func (mm *metricsMiddleware) CreateThing(ctx context.Context, thingID string, lo
|
||||
return mm.svc.CreateThing(ctx, thingID, loraDevEUI)
|
||||
}
|
||||
|
||||
func (mm *metricsMiddleware) UpdateThing(ctx context.Context, thingID string, loraDevEUI string) error {
|
||||
func (mm *metricsMiddleware) UpdateThing(ctx context.Context, thingID, loraDevEUI string) error {
|
||||
defer func(begin time.Time) {
|
||||
mm.counter.With("method", "update_thing").Add(1)
|
||||
mm.latency.With("method", "update_thing").Observe(time.Since(begin).Seconds())
|
||||
|
||||
+3
-3
@@ -10,7 +10,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/lora"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
@@ -24,12 +24,12 @@ type Subscriber interface {
|
||||
type broker struct {
|
||||
svc lora.Service
|
||||
client mqtt.Client
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewBroker returns new MQTT broker instance.
|
||||
func NewBroker(svc lora.Service, client mqtt.Client, t time.Duration, log logger.Logger) Subscriber {
|
||||
func NewBroker(svc lora.Service, client mqtt.Client, t time.Duration, log mglog.Logger) Subscriber {
|
||||
return broker{
|
||||
svc: svc,
|
||||
client: client,
|
||||
|
||||
+3
-3
@@ -12,7 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/mqtt/events"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
@@ -57,12 +57,12 @@ var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]
|
||||
type handler struct {
|
||||
publisher messaging.Publisher
|
||||
auth magistrala.AuthzServiceClient
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
es events.EventStore
|
||||
}
|
||||
|
||||
// NewHandler creates new Handler entity.
|
||||
func NewHandler(publisher messaging.Publisher, es events.EventStore, logger logger.Logger, auth magistrala.AuthzServiceClient) session.Handler {
|
||||
func NewHandler(publisher messaging.Publisher, es events.EventStore, logger mglog.Logger, auth magistrala.AuthzServiceClient) session.Handler {
|
||||
return &handler{
|
||||
es: es,
|
||||
logger: logger,
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/absmach/magistrala"
|
||||
authmocks "github.com/absmach/magistrala/auth/mocks"
|
||||
"github.com/absmach/magistrala/internal/testsutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/mqtt"
|
||||
"github.com/absmach/magistrala/mqtt/mocks"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
@@ -442,7 +442,7 @@ func TestDisconnect(t *testing.T) {
|
||||
}
|
||||
|
||||
func newHandler() (session.Handler, *authmocks.Service) {
|
||||
logger, err := logger.New(&logBuffer, "debug")
|
||||
logger, err := mglog.New(&logBuffer, "debug")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create logger: %s", err)
|
||||
}
|
||||
|
||||
+3
-3
@@ -7,7 +7,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/opcua/db"
|
||||
)
|
||||
|
||||
@@ -62,11 +62,11 @@ type adapterService struct {
|
||||
channelsRM RouteMapRepository
|
||||
connectRM RouteMapRepository
|
||||
cfg Config
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
}
|
||||
|
||||
// New instantiates the OPC-UA adapter implementation.
|
||||
func New(sub Subscriber, brow Browser, thingsRM, channelsRM, connectRM RouteMapRepository, cfg Config, log logger.Logger) Service {
|
||||
func New(sub Subscriber, brow Browser, thingsRM, channelsRM, connectRM RouteMapRepository, cfg Config, log mglog.Logger) Service {
|
||||
return &adapterService{
|
||||
subscriber: sub,
|
||||
browser: brow,
|
||||
|
||||
@@ -10,19 +10,19 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/opcua"
|
||||
)
|
||||
|
||||
var _ opcua.Service = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
svc opcua.Service
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the core service.
|
||||
func LoggingMiddleware(svc opcua.Service, logger logger.Logger) opcua.Service {
|
||||
func LoggingMiddleware(svc opcua.Service, logger mglog.Logger) opcua.Service {
|
||||
return &loggingMiddleware{
|
||||
logger: logger,
|
||||
svc: svc,
|
||||
|
||||
@@ -6,7 +6,7 @@ package gopcua
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/opcua"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
opcuagocpua "github.com/gopcua/opcua"
|
||||
@@ -36,11 +36,11 @@ var _ opcua.Browser = (*browser)(nil)
|
||||
|
||||
type browser struct {
|
||||
ctx context.Context
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
}
|
||||
|
||||
// NewBrowser returns new OPC-UA browser instance.
|
||||
func NewBrowser(ctx context.Context, log logger.Logger) opcua.Browser {
|
||||
func NewBrowser(ctx context.Context, log mglog.Logger) opcua.Browser {
|
||||
return browser{
|
||||
ctx: ctx,
|
||||
logger: log,
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/opcua"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
@@ -45,7 +45,7 @@ type client struct {
|
||||
thingsRM opcua.RouteMapRepository
|
||||
channelsRM opcua.RouteMapRepository
|
||||
connectRM opcua.RouteMapRepository
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
}
|
||||
|
||||
type message struct {
|
||||
@@ -58,7 +58,7 @@ type message struct {
|
||||
}
|
||||
|
||||
// NewSubscriber returns new OPC-UA client instance.
|
||||
func NewSubscriber(ctx context.Context, publisher messaging.Publisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber {
|
||||
func NewSubscriber(ctx context.Context, publisher messaging.Publisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log mglog.Logger) opcua.Subscriber {
|
||||
return client{
|
||||
ctx: ctx,
|
||||
publisher: publisher,
|
||||
|
||||
@@ -68,7 +68,7 @@ func (ce *customError) MarshalJSON() ([]byte, error) {
|
||||
}
|
||||
|
||||
// Contains inspects if e2 error is contained in any layer of e1 error.
|
||||
func Contains(e1 error, e2 error) bool {
|
||||
func Contains(e1, e2 error) bool {
|
||||
if e1 == nil || e2 == nil {
|
||||
return e2 == e1
|
||||
}
|
||||
@@ -83,7 +83,7 @@ func Contains(e1 error, e2 error) bool {
|
||||
}
|
||||
|
||||
// Wrap returns an Error that wrap err with wrapper.
|
||||
func Wrap(wrapper error, err error) error {
|
||||
func Wrap(wrapper, err error) error {
|
||||
if wrapper == nil || err == nil {
|
||||
return wrapper
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
httpapi "github.com/absmach/magistrala/certs/api"
|
||||
"github.com/absmach/magistrala/certs/mocks"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/clients"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
sdk "github.com/absmach/magistrala/pkg/sdk/go"
|
||||
@@ -61,7 +61,7 @@ func newCertService() (certs.Service, *authmocks.Service, *thmocks.Repository, e
|
||||
}
|
||||
|
||||
func newCertServer(svc certs.Service) *httptest.Server {
|
||||
logger := logger.NewMock()
|
||||
logger := mglog.NewMock()
|
||||
mux := httpapi.MakeHandler(svc, logger, instanceID)
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
httpapi "github.com/absmach/magistrala/consumers/notifiers/api"
|
||||
"github.com/absmach/magistrala/consumers/notifiers/mocks"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
sdk "github.com/absmach/magistrala/pkg/sdk/go"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
@@ -44,7 +44,7 @@ func newSubscriptionService() (notifiers.Service, *authmocks.Service) {
|
||||
}
|
||||
|
||||
func newSubscriptionServer(svc notifiers.Service) *httptest.Server {
|
||||
logger := logger.NewMock()
|
||||
logger := mglog.NewMock()
|
||||
mux := httpapi.MakeHandler(svc, logger, instanceID)
|
||||
|
||||
return httptest.NewServer(mux)
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"github.com/absmach/magistrala/http/api"
|
||||
"github.com/absmach/magistrala/http/mocks"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
sdk "github.com/absmach/magistrala/pkg/sdk/go"
|
||||
mproxy "github.com/mainflux/mproxy/pkg/http"
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
func newMessageService(cc magistrala.AuthzServiceClient) session.Handler {
|
||||
pub := mocks.NewPublisher()
|
||||
|
||||
return adapter.NewHandler(pub, logger.NewMock(), cc)
|
||||
return adapter.NewHandler(pub, mglog.NewMock(), cc)
|
||||
}
|
||||
|
||||
func newTargetHTTPServer() *httptest.Server {
|
||||
@@ -36,7 +36,7 @@ func newTargetHTTPServer() *httptest.Server {
|
||||
}
|
||||
|
||||
func newProxyHTTPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) {
|
||||
mp, err := mproxy.NewProxy("", targetServer.URL, svc, logger.NewMock())
|
||||
mp, err := mproxy.NewProxy("", targetServer.URL, svc, mglog.NewMock())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
+2
-2
@@ -1135,8 +1135,8 @@ func NewSDK(conf Config) SDK {
|
||||
|
||||
// processRequest creates and send a new HTTP request, and checks for errors in the HTTP response.
|
||||
// It then returns the response headers, the response body, and the associated error(s) (if any).
|
||||
func (sdk mgSDK) processRequest(method, url, token string, data []byte, headers map[string]string, expectedRespCodes ...int) (http.Header, []byte, errors.SDKError) {
|
||||
req, err := http.NewRequest(method, url, bytes.NewReader(data))
|
||||
func (sdk mgSDK) processRequest(method, reqUrl, token string, data []byte, headers map[string]string, expectedRespCodes ...int) (http.Header, []byte, errors.SDKError) {
|
||||
req, err := http.NewRequest(method, reqUrl, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return make(http.Header), []byte{}, errors.NewSDKError(err)
|
||||
}
|
||||
|
||||
@@ -195,7 +195,7 @@ func TestCreateThings(t *testing.T) {
|
||||
ts, cRepo, _, auth := newThingsServer()
|
||||
defer ts.Close()
|
||||
|
||||
things := []sdk.Thing{
|
||||
thingsList := []sdk.Thing{
|
||||
{
|
||||
Name: "test",
|
||||
Status: mgclients.EnabledStatus.String(),
|
||||
@@ -219,14 +219,14 @@ func TestCreateThings(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
desc: "register new things",
|
||||
things: things,
|
||||
response: things,
|
||||
things: thingsList,
|
||||
response: thingsList,
|
||||
token: token,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "register existing things",
|
||||
things: things,
|
||||
things: thingsList,
|
||||
response: []sdk.Thing{},
|
||||
token: token,
|
||||
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, sdk.ErrFailedCreation), http.StatusInternalServerError),
|
||||
|
||||
@@ -38,7 +38,7 @@ func (lm *loggingMiddleware) Provision(token, name, externalID, externalKey stri
|
||||
return lm.svc.Provision(token, name, externalID, externalKey)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Cert(token, thingID, duration string) (cert string, key string, err error) {
|
||||
func (lm *loggingMiddleware) Cert(token, thingID, duration string) (cert, key string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method cert for token: %s and thing: %v took %s to complete", token, thingID, time.Since(begin))
|
||||
if err != nil {
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/provision"
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -23,7 +23,7 @@ const (
|
||||
)
|
||||
|
||||
// MakeHandler returns a HTTP handler for API endpoints.
|
||||
func MakeHandler(svc provision.Service, logger logger.Logger, instanceID string) http.Handler {
|
||||
func MakeHandler(svc provision.Service, logger mglog.Logger, instanceID string) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, encodeError)),
|
||||
}
|
||||
|
||||
@@ -85,11 +85,11 @@ type Result struct {
|
||||
}
|
||||
|
||||
// New returns new provision service.
|
||||
func New(cfg Config, sdk sdk.SDK, logger mglog.Logger) Service {
|
||||
func New(cfg Config, mgsdk sdk.SDK, logger mglog.Logger) Service {
|
||||
return &provisionService{
|
||||
logger: logger,
|
||||
conf: cfg,
|
||||
sdk: sdk,
|
||||
sdk: mgsdk,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -61,7 +61,7 @@ type testRequest struct {
|
||||
}
|
||||
|
||||
func (tr testRequest) make() (*http.Response, error) {
|
||||
req, err := http.NewRequest(tr.method, tr.url, nil)
|
||||
req, err := http.NewRequest(tr.method, tr.url, http.NoBody)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -9,19 +9,19 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/readers"
|
||||
)
|
||||
|
||||
var _ readers.MessageRepository = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
svc readers.MessageRepository
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the core service.
|
||||
func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) readers.MessageRepository {
|
||||
func LoggingMiddleware(svc readers.MessageRepository, logger mglog.Logger) readers.MessageRepository {
|
||||
return &loggingMiddleware{
|
||||
logger: logger,
|
||||
svc: svc,
|
||||
|
||||
@@ -105,7 +105,7 @@ func (repo *influxRepository) ReadAll(chanID string, rpm readers.PageMetadata) (
|
||||
return page, nil
|
||||
}
|
||||
|
||||
func (repo *influxRepository) count(measurement, condition string, timeRange string) (uint64, error) {
|
||||
func (repo *influxRepository) count(measurement, condition, timeRange string) (uint64, error) {
|
||||
cmd := fmt.Sprintf(`
|
||||
import "influxdata/influxdb/v1"
|
||||
import "strings"
|
||||
|
||||
+27
-27
@@ -49,30 +49,30 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, rpm readers.PageMetada
|
||||
|
||||
var msgs []readers.Message
|
||||
for _, m := range repo.messages[chanID] {
|
||||
senml := m.(senml.Message)
|
||||
msg := m.(senml.Message)
|
||||
|
||||
ok := true
|
||||
|
||||
for name := range query {
|
||||
switch name {
|
||||
case "subtopic":
|
||||
if rpm.Subtopic != senml.Subtopic {
|
||||
if rpm.Subtopic != msg.Subtopic {
|
||||
ok = false
|
||||
}
|
||||
case "publisher":
|
||||
if rpm.Publisher != senml.Publisher {
|
||||
if rpm.Publisher != msg.Publisher {
|
||||
ok = false
|
||||
}
|
||||
case "name":
|
||||
if rpm.Name != senml.Name {
|
||||
if rpm.Name != msg.Name {
|
||||
ok = false
|
||||
}
|
||||
case "protocol":
|
||||
if rpm.Protocol != senml.Protocol {
|
||||
if rpm.Protocol != msg.Protocol {
|
||||
ok = false
|
||||
}
|
||||
case "v":
|
||||
if senml.Value == nil {
|
||||
if msg.Value == nil {
|
||||
ok = false
|
||||
}
|
||||
|
||||
@@ -80,57 +80,57 @@ func (repo *messageRepositoryMock) ReadAll(chanID string, rpm readers.PageMetada
|
||||
if okQuery {
|
||||
switch val.(string) {
|
||||
case readers.LowerThanKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value >= rpm.Value {
|
||||
if msg.Value != nil &&
|
||||
*msg.Value >= rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.LowerThanEqualKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value > rpm.Value {
|
||||
if msg.Value != nil &&
|
||||
*msg.Value > rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.GreaterThanKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value <= rpm.Value {
|
||||
if msg.Value != nil &&
|
||||
*msg.Value <= rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.GreaterThanEqualKey:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value < rpm.Value {
|
||||
if msg.Value != nil &&
|
||||
*msg.Value < rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
case readers.EqualKey:
|
||||
default:
|
||||
if senml.Value != nil &&
|
||||
*senml.Value != rpm.Value {
|
||||
if msg.Value != nil &&
|
||||
*msg.Value != rpm.Value {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
}
|
||||
case "vb":
|
||||
if senml.BoolValue == nil ||
|
||||
(senml.BoolValue != nil &&
|
||||
*senml.BoolValue != rpm.BoolValue) {
|
||||
if msg.BoolValue == nil ||
|
||||
(msg.BoolValue != nil &&
|
||||
*msg.BoolValue != rpm.BoolValue) {
|
||||
ok = false
|
||||
}
|
||||
case "vs":
|
||||
if senml.StringValue == nil ||
|
||||
(senml.StringValue != nil &&
|
||||
*senml.StringValue != rpm.StringValue) {
|
||||
if msg.StringValue == nil ||
|
||||
(msg.StringValue != nil &&
|
||||
*msg.StringValue != rpm.StringValue) {
|
||||
ok = false
|
||||
}
|
||||
case "vd":
|
||||
if senml.DataValue == nil ||
|
||||
(senml.DataValue != nil &&
|
||||
*senml.DataValue != rpm.DataValue) {
|
||||
if msg.DataValue == nil ||
|
||||
(msg.DataValue != nil &&
|
||||
*msg.DataValue != rpm.DataValue) {
|
||||
ok = false
|
||||
}
|
||||
case "from":
|
||||
if senml.Time < rpm.From {
|
||||
if msg.Time < rpm.From {
|
||||
ok = false
|
||||
}
|
||||
case "to":
|
||||
if senml.Time >= rpm.To {
|
||||
if msg.Time >= rpm.To {
|
||||
ok = false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,8 +39,8 @@ func NewClient(conn *grpc.ClientConn, timeout time.Duration) magistrala.AuthzSer
|
||||
}
|
||||
|
||||
func (client grpcClient) Authorize(ctx context.Context, req *magistrala.AuthorizeReq, _ ...grpc.CallOption) (r *magistrala.AuthorizeRes, err error) {
|
||||
ctx, close := context.WithTimeout(ctx, client.timeout)
|
||||
defer close()
|
||||
ctx, cancel := context.WithTimeout(ctx, client.timeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := client.authorize(ctx, req)
|
||||
if err != nil {
|
||||
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
"github.com/absmach/magistrala/internal/api"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
gapi "github.com/absmach/magistrala/internal/groups/api"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/pkg/groups"
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
)
|
||||
|
||||
func groupsHandler(svc groups.Service, r *chi.Mux, logger logger.Logger) http.Handler {
|
||||
func groupsHandler(svc groups.Service, r *chi.Mux, logger mglog.Logger) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)),
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func (lm *loggingMiddleware) ViewClientPerms(ctx context.Context, token, id stri
|
||||
return lm.svc.ViewClientPerms(ctx, token, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ListClients(ctx context.Context, token string, reqUserID string, pm mgclients.Page) (cp mgclients.ClientsPage, err error) {
|
||||
func (lm *loggingMiddleware) ListClients(ctx context.Context, token, reqUserID string, pm mgclients.Page) (cp mgclients.ClientsPage, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method list_things using token %s took %s to complete", token, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -109,7 +109,7 @@ func (lm *loggingMiddleware) UpdateClientSecret(ctx context.Context, token, oldS
|
||||
return lm.svc.UpdateClientSecret(ctx, token, oldSecret, newSecret)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) EnableClient(ctx context.Context, token string, id string) (c mgclients.Client, err error) {
|
||||
func (lm *loggingMiddleware) EnableClient(ctx context.Context, token, id string) (c mgclients.Client, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method enable_thing for thing with id %s using token %s took %s to complete", id, token, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -121,7 +121,7 @@ func (lm *loggingMiddleware) EnableClient(ctx context.Context, token string, id
|
||||
return lm.svc.EnableClient(ctx, token, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) DisableClient(ctx context.Context, token string, id string) (c mgclients.Client, err error) {
|
||||
func (lm *loggingMiddleware) DisableClient(ctx context.Context, token, id string) (c mgclients.Client, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method disable_thing for thing with id %s using token %s took %s to complete", id, token, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -169,7 +169,7 @@ func (lm *loggingMiddleware) Authorize(ctx context.Context, req *magistrala.Auth
|
||||
return lm.svc.Authorize(ctx, req)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Share(ctx context.Context, token, id string, relation string, userids ...string) (err error) {
|
||||
func (lm *loggingMiddleware) Share(ctx context.Context, token, id, relation string, userids ...string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method share for thing id %s with relation %s for users %v took %s to complete", id, relation, userids, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -181,7 +181,7 @@ func (lm *loggingMiddleware) Share(ctx context.Context, token, id string, relati
|
||||
return lm.svc.Share(ctx, token, id, relation, userids...)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Unshare(ctx context.Context, token, id string, relation string, userids ...string) (err error) {
|
||||
func (lm *loggingMiddleware) Unshare(ctx context.Context, token, id, relation string, userids ...string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method unshare for thing id %s with relation %s for users %v took %s to complete", id, relation, userids, time.Since(begin))
|
||||
if err != nil {
|
||||
|
||||
@@ -54,7 +54,7 @@ func (ms *metricsMiddleware) ViewClientPerms(ctx context.Context, token, id stri
|
||||
return ms.svc.ViewClientPerms(ctx, token, id)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) ListClients(ctx context.Context, token string, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
func (ms *metricsMiddleware) ListClients(ctx context.Context, token, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "list_things").Add(1)
|
||||
ms.latency.With("method", "list_things").Observe(time.Since(begin).Seconds())
|
||||
@@ -86,7 +86,7 @@ func (ms *metricsMiddleware) UpdateClientSecret(ctx context.Context, token, oldS
|
||||
return ms.svc.UpdateClientSecret(ctx, token, oldSecret, newSecret)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) EnableClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (ms *metricsMiddleware) EnableClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "enable_thing").Add(1)
|
||||
ms.latency.With("method", "enable_thing").Observe(time.Since(begin).Seconds())
|
||||
@@ -94,7 +94,7 @@ func (ms *metricsMiddleware) EnableClient(ctx context.Context, token string, id
|
||||
return ms.svc.EnableClient(ctx, token, id)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) DisableClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (ms *metricsMiddleware) DisableClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "disable_thing").Add(1)
|
||||
ms.latency.With("method", "disable_thing").Observe(time.Since(begin).Seconds())
|
||||
@@ -126,7 +126,7 @@ func (ms *metricsMiddleware) Authorize(ctx context.Context, req *magistrala.Auth
|
||||
return ms.svc.Authorize(ctx, req)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) Share(ctx context.Context, token, id string, relation string, userids ...string) error {
|
||||
func (ms *metricsMiddleware) Share(ctx context.Context, token, id, relation string, userids ...string) error {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "share").Add(1)
|
||||
ms.latency.With("method", "share").Observe(time.Since(begin).Seconds())
|
||||
@@ -134,7 +134,7 @@ func (ms *metricsMiddleware) Share(ctx context.Context, token, id string, relati
|
||||
return ms.svc.Share(ctx, token, id, relation, userids...)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) Unshare(ctx context.Context, token, id string, relation string, userids ...string) error {
|
||||
func (ms *metricsMiddleware) Unshare(ctx context.Context, token, id, relation string, userids ...string) error {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "unshare").Add(1)
|
||||
ms.latency.With("method", "unshare").Observe(time.Since(begin).Seconds())
|
||||
|
||||
Vendored
+1
-1
@@ -33,7 +33,7 @@ func NewCache(client *redis.Client, duration time.Duration) things.Cache {
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *thingCache) Save(ctx context.Context, thingKey string, thingID string) error {
|
||||
func (tc *thingCache) Save(ctx context.Context, thingKey, thingID string) error {
|
||||
tkey := fmt.Sprintf("%s:%s", keyPrefix, thingKey)
|
||||
if err := tc.client.Set(ctx, tkey, thingID, tc.keyDuration).Err(); err != nil {
|
||||
return errors.Wrap(errors.ErrCreateEntity, err)
|
||||
|
||||
@@ -125,7 +125,7 @@ func (es *eventStore) ViewClientPerms(ctx context.Context, token, id string) ([]
|
||||
return permissions, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) ListClients(ctx context.Context, token string, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
func (es *eventStore) ListClients(ctx context.Context, token, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
cp, err := es.svc.ListClients(ctx, token, reqUserID, pm)
|
||||
if err != nil {
|
||||
return cp, err
|
||||
@@ -222,7 +222,7 @@ func (es *eventStore) Authorize(ctx context.Context, req *magistrala.AuthorizeRe
|
||||
return thingID, nil
|
||||
}
|
||||
|
||||
func (es *eventStore) Share(ctx context.Context, token, id string, relation string, userids ...string) error {
|
||||
func (es *eventStore) Share(ctx context.Context, token, id, relation string, userids ...string) error {
|
||||
if err := es.svc.Share(ctx, token, id, relation, userids...); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -237,7 +237,7 @@ func (es *eventStore) Share(ctx context.Context, token, id string, relation stri
|
||||
return es.Publish(ctx, event)
|
||||
}
|
||||
|
||||
func (es *eventStore) Unshare(ctx context.Context, token, id string, relation string, userids ...string) error {
|
||||
func (es *eventStore) Unshare(ctx context.Context, token, id, relation string, userids ...string) error {
|
||||
if err := es.svc.Unshare(ctx, token, id, relation, userids...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
+3
-3
@@ -126,7 +126,7 @@ func (svc service) CreateThings(ctx context.Context, token string, cls ...mgclie
|
||||
return saved, nil
|
||||
}
|
||||
|
||||
func (svc service) ViewClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (svc service) ViewClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
_, err := svc.authorize(ctx, auth.UserType, auth.TokenKind, token, auth.ViewPermission, auth.ThingType, id)
|
||||
if err != nil {
|
||||
return mgclients.Client{}, errors.Wrap(svcerr.ErrAuthorization, err)
|
||||
@@ -135,7 +135,7 @@ func (svc service) ViewClient(ctx context.Context, token string, id string) (mgc
|
||||
return svc.clients.RetrieveByID(ctx, id)
|
||||
}
|
||||
|
||||
func (svc service) ViewClientPerms(ctx context.Context, token string, id string) ([]string, error) {
|
||||
func (svc service) ViewClientPerms(ctx context.Context, token, id string) ([]string, error) {
|
||||
res, err := svc.identify(ctx, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -151,7 +151,7 @@ func (svc service) ViewClientPerms(ctx context.Context, token string, id string)
|
||||
return permissions, nil
|
||||
}
|
||||
|
||||
func (svc service) ListClients(ctx context.Context, token string, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
func (svc service) ListClients(ctx context.Context, token, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
var ids []string
|
||||
|
||||
res, err := svc.identify(ctx, token)
|
||||
|
||||
@@ -34,21 +34,21 @@ func (tm *tracingMiddleware) CreateThings(ctx context.Context, token string, cli
|
||||
}
|
||||
|
||||
// ViewClient traces the "ViewClient" operation of the wrapped policies.Service.
|
||||
func (tm *tracingMiddleware) ViewClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (tm *tracingMiddleware) ViewClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_view_client", trace.WithAttributes(attribute.String("id", id)))
|
||||
defer span.End()
|
||||
return tm.svc.ViewClient(ctx, token, id)
|
||||
}
|
||||
|
||||
// ViewClientPerms traces the "ViewClientPerms" operation of the wrapped policies.Service.
|
||||
func (tm *tracingMiddleware) ViewClientPerms(ctx context.Context, token string, id string) ([]string, error) {
|
||||
func (tm *tracingMiddleware) ViewClientPerms(ctx context.Context, token, id string) ([]string, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_view_client_permissions", trace.WithAttributes(attribute.String("id", id)))
|
||||
defer span.End()
|
||||
return tm.svc.ViewClientPerms(ctx, token, id)
|
||||
}
|
||||
|
||||
// ListClients traces the "ListClients" operation of the wrapped policies.Service.
|
||||
func (tm *tracingMiddleware) ListClients(ctx context.Context, token string, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
func (tm *tracingMiddleware) ListClients(ctx context.Context, token, reqUserID string, pm mgclients.Page) (mgclients.ClientsPage, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_list_clients")
|
||||
defer span.End()
|
||||
return tm.svc.ListClients(ctx, token, reqUserID, pm)
|
||||
@@ -121,14 +121,14 @@ func (tm *tracingMiddleware) Authorize(ctx context.Context, req *magistrala.Auth
|
||||
}
|
||||
|
||||
// Share traces the "Share" operation of the wrapped things.Service.
|
||||
func (tm *tracingMiddleware) Share(ctx context.Context, token, id string, relation string, userids ...string) error {
|
||||
func (tm *tracingMiddleware) Share(ctx context.Context, token, id, relation string, userids ...string) error {
|
||||
ctx, span := tm.tracer.Start(ctx, "share", trace.WithAttributes(attribute.String("id", id), attribute.String("relation", relation), attribute.StringSlice("user_ids", userids)))
|
||||
defer span.End()
|
||||
return tm.svc.Share(ctx, token, id, relation, userids...)
|
||||
}
|
||||
|
||||
// Unshare traces the "Unshare" operation of the wrapped things.Service.
|
||||
func (tm *tracingMiddleware) Unshare(ctx context.Context, token, id string, relation string, userids ...string) error {
|
||||
func (tm *tracingMiddleware) Unshare(ctx context.Context, token, id, relation string, userids ...string) error {
|
||||
ctx, span := tm.tracer.Start(ctx, "unshare", trace.WithAttributes(attribute.String("id", id), attribute.String("relation", relation), attribute.StringSlice("user_ids", userids)))
|
||||
defer span.End()
|
||||
return tm.svc.Unshare(ctx, token, id, relation, userids...)
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
authmocks "github.com/absmach/magistrala/auth/mocks"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/internal/testsutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/twins"
|
||||
httpapi "github.com/absmach/magistrala/twins/api/http"
|
||||
"github.com/absmach/magistrala/twins/mocks"
|
||||
@@ -87,7 +87,7 @@ func (tr testRequest) make() (*http.Response, error) {
|
||||
}
|
||||
|
||||
func newServer(svc twins.Service) *httptest.Server {
|
||||
logger := logger.NewMock()
|
||||
logger := mglog.NewMock()
|
||||
mux := httpapi.MakeHandler(svc, logger, instanceID)
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/twins"
|
||||
@@ -32,7 +32,7 @@ const (
|
||||
)
|
||||
|
||||
// MakeHandler returns a HTTP handler for API endpoints.
|
||||
func MakeHandler(svc twins.Service, logger logger.Logger, instanceID string) http.Handler {
|
||||
func MakeHandler(svc twins.Service, logger mglog.Logger, instanceID string) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, encodeError)),
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ func (lm *loggingMiddleware) ViewTwin(ctx context.Context, token, twinID string)
|
||||
return lm.svc.ViewTwin(ctx, token, twinID)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (page twins.Page, err error) {
|
||||
func (lm *loggingMiddleware) ListTwins(ctx context.Context, token string, offset, limit uint64, name string, metadata twins.Metadata) (page twins.Page, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method list_twins for token %s took %s to complete", token, time.Since(begin))
|
||||
if err != nil {
|
||||
@@ -92,7 +92,7 @@ func (lm *loggingMiddleware) SaveStates(ctx context.Context, msg *messaging.Mess
|
||||
return lm.svc.SaveStates(ctx, msg)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, twinID string) (page twins.StatesPage, err error) {
|
||||
func (lm *loggingMiddleware) ListStates(ctx context.Context, token string, offset, limit uint64, twinID string) (page twins.StatesPage, err error) {
|
||||
defer func(begin time.Time) {
|
||||
message := fmt.Sprintf("Method list_states for token %s took %s to complete", token, time.Since(begin))
|
||||
if err != nil {
|
||||
|
||||
@@ -58,7 +58,7 @@ func (ms *metricsMiddleware) ViewTwin(ctx context.Context, token, twinID string)
|
||||
return ms.svc.ViewTwin(ctx, token, twinID)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (page twins.Page, err error) {
|
||||
func (ms *metricsMiddleware) ListTwins(ctx context.Context, token string, offset, limit uint64, name string, metadata twins.Metadata) (page twins.Page, err error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "list_twins").Add(1)
|
||||
ms.latency.With("method", "list_twins").Observe(time.Since(begin).Seconds())
|
||||
@@ -76,7 +76,7 @@ func (ms *metricsMiddleware) SaveStates(ctx context.Context, msg *messaging.Mess
|
||||
return ms.svc.SaveStates(ctx, msg)
|
||||
}
|
||||
|
||||
func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset uint64, limit uint64, twinID string) (st twins.StatesPage, err error) {
|
||||
func (ms *metricsMiddleware) ListStates(ctx context.Context, token string, offset, limit uint64, twinID string) (st twins.StatesPage, err error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "list_states").Add(1)
|
||||
ms.latency.With("method", "list_states").Observe(time.Since(begin).Seconds())
|
||||
|
||||
@@ -101,7 +101,7 @@ func (es eventStore) RemoveTwin(ctx context.Context, token, id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es eventStore) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
|
||||
func (es eventStore) ListTwins(ctx context.Context, token string, offset, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
|
||||
tp, err := es.svc.ListTwins(ctx, token, offset, limit, name, metadata)
|
||||
if err != nil {
|
||||
return tp, err
|
||||
@@ -120,7 +120,7 @@ func (es eventStore) ListTwins(ctx context.Context, token string, offset uint64,
|
||||
return tp, nil
|
||||
}
|
||||
|
||||
func (es eventStore) ListStates(ctx context.Context, token string, offset uint64, limit uint64, id string) (twins.StatesPage, error) {
|
||||
func (es eventStore) ListStates(ctx context.Context, token string, offset, limit uint64, id string) (twins.StatesPage, error) {
|
||||
sp, err := es.svc.ListStates(ctx, token, offset, limit, id)
|
||||
if err != nil {
|
||||
return sp, err
|
||||
|
||||
@@ -8,6 +8,6 @@ import "fmt"
|
||||
// Since mocks will store data in map, and they need to resemble the real
|
||||
// identifiers as much as possible, a key will be created as combination of
|
||||
// owner and their id. This will allow searching by prefix or suffix.
|
||||
func key(owner string, id string) string {
|
||||
func key(owner, id string) string {
|
||||
return fmt.Sprintf("%s-%s", owner, id)
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ func NewService() (twins.Service, *authmocks.Service) {
|
||||
}
|
||||
|
||||
// CreateDefinition creates twin definition.
|
||||
func CreateDefinition(channels []string, subtopics []string) twins.Definition {
|
||||
func CreateDefinition(channels, subtopics []string) twins.Definition {
|
||||
var def twins.Definition
|
||||
for i := range channels {
|
||||
attr := twins.Attribute{
|
||||
@@ -47,7 +47,7 @@ func CreateDefinition(channels []string, subtopics []string) twins.Definition {
|
||||
}
|
||||
|
||||
// CreateTwin creates twin.
|
||||
func CreateTwin(channels []string, subtopics []string) twins.Twin {
|
||||
func CreateTwin(channels, subtopics []string) twins.Twin {
|
||||
id++
|
||||
return twins.Twin{
|
||||
ID: strconv.Itoa(id),
|
||||
|
||||
@@ -52,7 +52,7 @@ func (srm *stateRepositoryMock) Count(ctx context.Context, tw twins.Twin) (int64
|
||||
return int64(len(srm.states)), nil
|
||||
}
|
||||
|
||||
func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset uint64, limit uint64, twinID string) (twins.StatesPage, error) {
|
||||
func (srm *stateRepositoryMock) RetrieveAll(ctx context.Context, offset, limit uint64, twinID string) (twins.StatesPage, error) {
|
||||
srm.mu.Lock()
|
||||
defer srm.mu.Unlock()
|
||||
|
||||
|
||||
@@ -85,7 +85,7 @@ func (trm *twinRepositoryMock) RetrieveByAttribute(ctx context.Context, channel,
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (trm *twinRepositoryMock) RetrieveAll(_ context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
|
||||
func (trm *twinRepositoryMock) RetrieveAll(_ context.Context, owner string, offset, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
|
||||
trm.mu.Lock()
|
||||
defer trm.mu.Unlock()
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
@@ -20,7 +20,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
// Connect creates a connection to the MongoDB instance.
|
||||
func Connect(cfg Config, logger logger.Logger) (*mongo.Database, error) {
|
||||
func Connect(cfg Config, logger mglog.Logger) (*mongo.Database, error) {
|
||||
addr := fmt.Sprintf("mongodb://%s:%s", cfg.Host, cfg.Port)
|
||||
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(addr))
|
||||
if err != nil {
|
||||
|
||||
@@ -69,7 +69,7 @@ func (sr *stateRepository) Count(ctx context.Context, tw twins.Twin) (int64, err
|
||||
}
|
||||
|
||||
// RetrieveAll retrieves the subset of states related to twin specified by id.
|
||||
func (sr *stateRepository) RetrieveAll(ctx context.Context, offset uint64, limit uint64, twinID string) (twins.StatesPage, error) {
|
||||
func (sr *stateRepository) RetrieveAll(ctx context.Context, offset, limit uint64, twinID string) (twins.StatesPage, error) {
|
||||
coll := sr.db.Collection(statesCollection)
|
||||
|
||||
findOptions := options.Find()
|
||||
|
||||
@@ -131,7 +131,7 @@ func (tr *twinRepository) RetrieveByAttribute(ctx context.Context, channel, subt
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset uint64, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
|
||||
func (tr *twinRepository) RetrieveAll(ctx context.Context, owner string, offset, limit uint64, name string, metadata twins.Metadata) (twins.Page, error) {
|
||||
coll := tr.db.Collection(twinsCollection)
|
||||
|
||||
findOptions := options.Find()
|
||||
|
||||
+5
-5
@@ -11,7 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
@@ -80,13 +80,13 @@ type twinsService struct {
|
||||
idProvider magistrala.IDProvider
|
||||
channelID string
|
||||
twinCache TwinCache
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
}
|
||||
|
||||
var _ Service = (*twinsService)(nil)
|
||||
|
||||
// New instantiates the twins service implementation.
|
||||
func New(publisher messaging.Publisher, auth magistrala.AuthServiceClient, twins TwinRepository, tcache TwinCache, sr StateRepository, idp magistrala.IDProvider, chann string, logger logger.Logger) Service {
|
||||
func New(publisher messaging.Publisher, auth magistrala.AuthServiceClient, twins TwinRepository, tcache TwinCache, sr StateRepository, idp magistrala.IDProvider, chann string, logger mglog.Logger) Service {
|
||||
return &twinsService{
|
||||
publisher: publisher,
|
||||
auth: auth,
|
||||
@@ -227,7 +227,7 @@ func (ts *twinsService) RemoveTwin(ctx context.Context, token, twinID string) (e
|
||||
return ts.twinCache.Remove(ctx, twinID)
|
||||
}
|
||||
|
||||
func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint64, limit uint64, name string, metadata Metadata) (Page, error) {
|
||||
func (ts *twinsService) ListTwins(ctx context.Context, token string, offset, limit uint64, name string, metadata Metadata) (Page, error) {
|
||||
res, err := ts.auth.Identify(ctx, &magistrala.IdentityReq{Token: token})
|
||||
if err != nil {
|
||||
return Page{}, errors.Wrap(svcerr.ErrAuthentication, err)
|
||||
@@ -236,7 +236,7 @@ func (ts *twinsService) ListTwins(ctx context.Context, token string, offset uint
|
||||
return ts.twins.RetrieveAll(ctx, res.GetId(), offset, limit, name, metadata)
|
||||
}
|
||||
|
||||
func (ts *twinsService) ListStates(ctx context.Context, token string, offset uint64, limit uint64, twinID string) (StatesPage, error) {
|
||||
func (ts *twinsService) ListStates(ctx context.Context, token string, offset, limit uint64, twinID string) (StatesPage, error) {
|
||||
_, err := ts.auth.Identify(ctx, &magistrala.IdentityReq{Token: token})
|
||||
if err != nil {
|
||||
return StatesPage{}, svcerr.ErrAuthentication
|
||||
|
||||
+2
-2
@@ -12,7 +12,7 @@ import (
|
||||
"github.com/absmach/magistrala/internal/api"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
gapi "github.com/absmach/magistrala/internal/groups/api"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/pkg/groups"
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -22,7 +22,7 @@ import (
|
||||
)
|
||||
|
||||
// MakeHandler returns a HTTP handler for Groups API endpoints.
|
||||
func groupsHandler(svc groups.Service, r *chi.Mux, logger logger.Logger) http.Handler {
|
||||
func groupsHandler(svc groups.Service, r *chi.Mux, logger mglog.Logger) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)),
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ func (ms *metricsMiddleware) UpdateClientRole(ctx context.Context, token string,
|
||||
}
|
||||
|
||||
// EnableClient instruments EnableClient method with metrics.
|
||||
func (ms *metricsMiddleware) EnableClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (ms *metricsMiddleware) EnableClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "enable_client").Add(1)
|
||||
ms.latency.With("method", "enable_client").Observe(time.Since(begin).Seconds())
|
||||
@@ -166,7 +166,7 @@ func (ms *metricsMiddleware) EnableClient(ctx context.Context, token string, id
|
||||
}
|
||||
|
||||
// DisableClient instruments DisableClient method with metrics.
|
||||
func (ms *metricsMiddleware) DisableClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (ms *metricsMiddleware) DisableClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
defer func(begin time.Time) {
|
||||
ms.counter.With("method", "disable_client").Add(1)
|
||||
ms.latency.With("method", "disable_client").Observe(time.Since(begin).Seconds())
|
||||
|
||||
+4
-4
@@ -51,10 +51,10 @@ type service struct {
|
||||
}
|
||||
|
||||
// NewService returns a new Users service implementation.
|
||||
func NewService(crepo postgres.Repository, auth magistrala.AuthServiceClient, emailer Emailer, hasher Hasher, idp magistrala.IDProvider, pr *regexp.Regexp, selfRegister bool) Service {
|
||||
func NewService(crepo postgres.Repository, authClient magistrala.AuthServiceClient, emailer Emailer, hasher Hasher, idp magistrala.IDProvider, pr *regexp.Regexp, selfRegister bool) Service {
|
||||
return service{
|
||||
clients: crepo,
|
||||
auth: auth,
|
||||
auth: authClient,
|
||||
hasher: hasher,
|
||||
email: emailer,
|
||||
idProvider: idp,
|
||||
@@ -134,7 +134,7 @@ func (svc service) RefreshToken(ctx context.Context, refreshToken, domainID stri
|
||||
return svc.auth.Refresh(ctx, &magistrala.RefreshReq{RefreshToken: refreshToken, DomainId: &d})
|
||||
}
|
||||
|
||||
func (svc service) ViewClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (svc service) ViewClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
tokenUserID, err := svc.Identify(ctx, token)
|
||||
if err != nil {
|
||||
return mgclients.Client{}, errors.Wrap(svcerr.ErrAuthentication, err)
|
||||
@@ -415,7 +415,7 @@ func (svc service) changeClientStatus(ctx context.Context, token string, client
|
||||
return svc.clients.ChangeStatus(ctx, client)
|
||||
}
|
||||
|
||||
func (svc service) ListMembers(ctx context.Context, token, objectKind string, objectID string, pm mgclients.Page) (mgclients.MembersPage, error) {
|
||||
func (svc service) ListMembers(ctx context.Context, token, objectKind, objectID string, pm mgclients.Page) (mgclients.MembersPage, error) {
|
||||
var objectType string
|
||||
var authzPerm string
|
||||
switch objectKind {
|
||||
|
||||
@@ -50,7 +50,7 @@ func (tm *tracingMiddleware) RefreshToken(ctx context.Context, accessToken, doma
|
||||
}
|
||||
|
||||
// ViewClient traces the "ViewClient" operation of the wrapped clients.Service.
|
||||
func (tm *tracingMiddleware) ViewClient(ctx context.Context, token string, id string) (mgclients.Client, error) {
|
||||
func (tm *tracingMiddleware) ViewClient(ctx context.Context, token, id string) (mgclients.Client, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_view_client", trace.WithAttributes(attribute.String("id", id)))
|
||||
defer span.End()
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
authmocks "github.com/absmach/magistrala/auth/mocks"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/ws"
|
||||
"github.com/absmach/magistrala/ws/api"
|
||||
"github.com/absmach/magistrala/ws/mocks"
|
||||
@@ -42,13 +42,13 @@ func newService(auth magistrala.AuthzServiceClient) (ws.Service, mocks.MockPubSu
|
||||
}
|
||||
|
||||
func newHTTPServer(svc ws.Service) *httptest.Server {
|
||||
mux := api.MakeHandler(context.Background(), svc, logger.NewMock(), instanceID)
|
||||
mux := api.MakeHandler(context.Background(), svc, mglog.NewMock(), instanceID)
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
func newProxyHTPPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) {
|
||||
url := strings.ReplaceAll(targetServer.URL, "http", "ws")
|
||||
mp, err := websockets.NewProxy("", url, logger.NewMock(), svc)
|
||||
turl := strings.ReplaceAll(targetServer.URL, "http", "ws")
|
||||
mp, err := websockets.NewProxy("", turl, mglog.NewMock(), svc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -83,8 +83,8 @@ func handshake(tsURL, chanID, subtopic, thingKey string, addHeader bool) (*webso
|
||||
header.Add("Authorization", thingKey)
|
||||
}
|
||||
|
||||
url, _ := makeURL(tsURL, chanID, subtopic, thingKey, addHeader)
|
||||
conn, res, errRet := websocket.DefaultDialer.Dial(url, header)
|
||||
turl, _ := makeURL(tsURL, chanID, subtopic, thingKey, addHeader)
|
||||
conn, res, errRet := websocket.DefaultDialer.Dial(turl, header)
|
||||
|
||||
return conn, res, errRet
|
||||
}
|
||||
@@ -94,7 +94,7 @@ func TestHandshake(t *testing.T) {
|
||||
svc, pubsub := newService(auth)
|
||||
target := newHTTPServer(svc)
|
||||
defer target.Close()
|
||||
handler := ws.NewHandler(pubsub, logger.NewMock(), auth)
|
||||
handler := ws.NewHandler(pubsub, mglog.NewMock(), auth)
|
||||
ts, err := newProxyHTPPServer(handler, target)
|
||||
require.Nil(t, err)
|
||||
defer ts.Close()
|
||||
|
||||
+3
-3
@@ -12,7 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/logger"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
@@ -56,11 +56,11 @@ var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]
|
||||
type handler struct {
|
||||
pubsub messaging.PubSub
|
||||
auth magistrala.AuthzServiceClient
|
||||
logger logger.Logger
|
||||
logger mglog.Logger
|
||||
}
|
||||
|
||||
// NewHandler creates new Handler entity.
|
||||
func NewHandler(pubsub messaging.PubSub, logger logger.Logger, auth magistrala.AuthzServiceClient) session.Handler {
|
||||
func NewHandler(pubsub messaging.PubSub, logger mglog.Logger, auth magistrala.AuthzServiceClient) session.Handler {
|
||||
return &handler{
|
||||
logger: logger,
|
||||
pubsub: pubsub,
|
||||
|
||||
@@ -32,7 +32,7 @@ func New(tracer trace.Tracer, svc ws.Service) ws.Service {
|
||||
}
|
||||
|
||||
// Subscribe traces the "Subscribe" operation of the wrapped ws.Service.
|
||||
func (tm *tracingMiddleware) Subscribe(ctx context.Context, thingKey string, chanID string, subtopic string, client *ws.Client) error {
|
||||
func (tm *tracingMiddleware) Subscribe(ctx context.Context, thingKey, chanID, subtopic string, client *ws.Client) error {
|
||||
ctx, span := tm.tracer.Start(ctx, subscribeOP)
|
||||
defer span.End()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user