mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:00:25 +00:00
NOISSUE - Add property based testing to HTTP Adapter (#2212)
Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com>
This commit is contained in:
@@ -30,6 +30,7 @@ env:
|
||||
DOMAIN_NAME: demo-test
|
||||
USERS_URL: http://localhost:9002
|
||||
THINGS_URL: http://localhost:9000
|
||||
HTTP_ADAPTER_URL: http://localhost:8008
|
||||
INVITATIONS_URL: http://localhost:9020
|
||||
AUTH_URL: http://localhost:8189
|
||||
BOOTSTRAP_URL: http://localhost:9013
|
||||
@@ -54,7 +55,7 @@ jobs:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 1.21.x
|
||||
go-version: 1.22.x
|
||||
cache-dependency-path: "go.sum"
|
||||
|
||||
- name: Build images
|
||||
@@ -69,6 +70,8 @@ jobs:
|
||||
export DOMAIN_ID=$(curl -sSX POST $DOMAINS_URL -H "Content-Type: application/json" -H "Authorization: Bearer $USER_TOKEN" -d "{\"name\":\"$DOMAIN_NAME\",\"alias\":\"$DOMAIN_NAME\"}" | jq -r .id)
|
||||
export USER_TOKEN=$(curl -sSX POST $TOKENS_URL -H "Content-Type: application/json" -d "{\"identity\": \"$USER_IDENTITY\",\"secret\": \"$USER_SECRET\",\"domain_id\": \"$DOMAIN_ID\"}" | jq -r .access_token)
|
||||
echo "USER_TOKEN=$USER_TOKEN" >> $GITHUB_ENV
|
||||
export THING_SECRET=$(magistrala-cli provision test | /usr/bin/grep -Eo '"secret": "[^"]+"' | awk 'NR % 2 == 0' | sed 's/"secret": "\(.*\)"/\1/')
|
||||
echo "THING_SECRET=$THING_SECRET" >> $GITHUB_ENV
|
||||
|
||||
- name: Check for changes in specific paths
|
||||
uses: dorny/paths-filter@v3
|
||||
@@ -150,6 +153,16 @@ jobs:
|
||||
report: false
|
||||
args: '--header "Authorization: Bearer ${{ env.USER_TOKEN }}" --contrib-openapi-formats-uuid --hypothesis-suppress-health-check=filter_too_much --stateful=links'
|
||||
|
||||
- name: Run HTTP Adapter API tests
|
||||
if: steps.changes.outputs.http == 'true'
|
||||
uses: schemathesis/action@v1
|
||||
with:
|
||||
schema: api/openapi/http.yml
|
||||
base-url: ${{ env.HTTP_ADAPTER_URL }}
|
||||
checks: all
|
||||
report: false
|
||||
args: '--header "Authorization: Thing ${{ env.THING_SECRET }}" --contrib-openapi-formats-uuid --hypothesis-suppress-health-check=filter_too_much --stateful=links'
|
||||
|
||||
- name: Run Invitations API tests
|
||||
if: steps.changes.outputs.invitations == 'true'
|
||||
uses: schemathesis/action@v1
|
||||
|
||||
@@ -20,7 +20,7 @@ jobs:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 1.21.x
|
||||
go-version: 1.22.x
|
||||
cache-dependency-path: "go.sum"
|
||||
|
||||
- name: Run tests
|
||||
|
||||
@@ -21,7 +21,7 @@ jobs:
|
||||
- name: Install Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 1.21.x
|
||||
go-version: 1.22.x
|
||||
cache-dependency-path: "go.sum"
|
||||
|
||||
- name: Check for changes in go.mod
|
||||
|
||||
@@ -20,13 +20,13 @@ jobs:
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 1.21.x
|
||||
go-version: 1.22.x
|
||||
cache-dependency-path: "go.sum"
|
||||
|
||||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@v6
|
||||
with:
|
||||
version: v1.56.1
|
||||
version: v1.57.2
|
||||
|
||||
- name: Build all Binaries
|
||||
run: |
|
||||
@@ -54,7 +54,7 @@ jobs:
|
||||
- name: Setup Go
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: 1.21.x
|
||||
go-version: 1.22.x
|
||||
cache-dependency-path: "go.sum"
|
||||
|
||||
- name: Check for changes in specific paths
|
||||
|
||||
@@ -31,9 +31,6 @@ linters-settings:
|
||||
|
||||
gocritic:
|
||||
enabled-checks:
|
||||
- captLocal
|
||||
- singleCaseSwitch
|
||||
- switchTrue
|
||||
- importShadow
|
||||
- httpNoBody
|
||||
- paramTypeCombine
|
||||
|
||||
@@ -141,17 +141,34 @@ define test_api_service
|
||||
exit 1; \
|
||||
fi
|
||||
|
||||
st run api/openapi/$(svc).yml \
|
||||
--checks all \
|
||||
--base-url $(2) \
|
||||
--header "Authorization: Bearer $(USER_TOKEN)" \
|
||||
--contrib-openapi-formats-uuid \
|
||||
--hypothesis-suppress-health-check=filter_too_much \
|
||||
--stateful=links
|
||||
@if [ "$(svc)" = "http" ] && [ -z "$(THING_SECRET)" ]; then \
|
||||
echo "THING_SECRET is not set"; \
|
||||
echo "Please set it to a valid secret"; \
|
||||
exit 1; \
|
||||
fi
|
||||
|
||||
@if [ "$(svc)" = "http" ]; then \
|
||||
st run api/openapi/$(svc).yml \
|
||||
--checks all \
|
||||
--base-url $(2) \
|
||||
--header "Authorization: Thing $(THING_SECRET)" \
|
||||
--contrib-openapi-formats-uuid \
|
||||
--hypothesis-suppress-health-check=filter_too_much \
|
||||
--stateful=links; \
|
||||
else \
|
||||
st run api/openapi/$(svc).yml \
|
||||
--checks all \
|
||||
--base-url $(2) \
|
||||
--header "Authorization: Bearer $(USER_TOKEN)" \
|
||||
--contrib-openapi-formats-uuid \
|
||||
--hypothesis-suppress-health-check=filter_too_much \
|
||||
--stateful=links; \
|
||||
fi
|
||||
endef
|
||||
|
||||
test_api_users: TEST_API_URL := http://localhost:9002
|
||||
test_api_things: TEST_API_URL := http://localhost:9000
|
||||
test_api_http: TEST_API_URL := http://localhost:8008
|
||||
test_api_invitations: TEST_API_URL := http://localhost:9020
|
||||
test_api_auth: TEST_API_URL := http://localhost:8189
|
||||
test_api_bootstrap: TEST_API_URL := http://localhost:9013
|
||||
|
||||
@@ -322,6 +322,8 @@ paths:
|
||||
description: Issued new key.
|
||||
"400":
|
||||
description: Failed due to malformed JSON.
|
||||
"401":
|
||||
description: Missing or invalid access token provided.
|
||||
"409":
|
||||
description: Failed due to using already existing ID.
|
||||
"415":
|
||||
@@ -415,6 +417,8 @@ paths:
|
||||
description: Policies deleted.
|
||||
"400":
|
||||
description: Failed due to malformed JSON.
|
||||
"401":
|
||||
description: Missing or invalid access token provided.
|
||||
"404":
|
||||
description: A non-existent entity request.
|
||||
"409":
|
||||
|
||||
@@ -158,7 +158,7 @@ components:
|
||||
HealthRes:
|
||||
description: Service Health Check.
|
||||
content:
|
||||
application/json:
|
||||
application/health+json:
|
||||
schema:
|
||||
$ref: "./schemas/HealthInfo.yml"
|
||||
|
||||
|
||||
@@ -91,6 +91,8 @@ paths:
|
||||
responses:
|
||||
"200":
|
||||
$ref: "#/components/responses/View"
|
||||
"400":
|
||||
description: Failed due to malformed query parameters.
|
||||
"401":
|
||||
description: Missing or invalid access token provided.
|
||||
"403":
|
||||
|
||||
@@ -259,7 +259,6 @@ components:
|
||||
attributes:
|
||||
type: array
|
||||
minItems: 0
|
||||
uniqueItems: true
|
||||
items:
|
||||
$ref: "#/components/schemas/Attribute"
|
||||
TwinReqObj:
|
||||
@@ -300,7 +299,6 @@ components:
|
||||
definitions:
|
||||
type: array
|
||||
minItems: 0
|
||||
uniqueItems: true
|
||||
items:
|
||||
$ref: "#/components/schemas/Definition"
|
||||
metadata:
|
||||
@@ -348,7 +346,6 @@ components:
|
||||
states:
|
||||
type: array
|
||||
minItems: 0
|
||||
uniqueItems: true
|
||||
items:
|
||||
$ref: "#/components/schemas/State"
|
||||
total:
|
||||
|
||||
+23
-9
@@ -6,6 +6,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"log"
|
||||
"log/slog"
|
||||
@@ -28,7 +29,8 @@ import (
|
||||
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
|
||||
"github.com/absmach/magistrala/pkg/messaging/handler"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
mproxy "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxyhttp "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/caarlos0/env/v10"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@@ -127,7 +129,7 @@ func main() {
|
||||
svc := newService(pub, authClient, logger, tracer)
|
||||
targetServerCfg := server.Config{Port: targetHTTPPort}
|
||||
|
||||
hs := httpserver.New(ctx, cancel, svcName, targetServerCfg, api.MakeHandler(cfg.InstanceID), logger)
|
||||
hs := httpserver.New(ctx, cancel, svcName, targetServerCfg, api.MakeHandler(logger, cfg.InstanceID), logger)
|
||||
|
||||
if cfg.SendTelemetry {
|
||||
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
|
||||
@@ -161,31 +163,43 @@ func newService(pub messaging.Publisher, tc magistrala.AuthzServiceClient, logge
|
||||
}
|
||||
|
||||
func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sessionHandler session.Handler) error {
|
||||
address := fmt.Sprintf("%s:%s", "", cfg.Port)
|
||||
target := fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort)
|
||||
mp, err := mproxy.NewProxy(address, target, sessionHandler, logger)
|
||||
config := mproxy.Config{
|
||||
Address: fmt.Sprintf("%s:%s", "", cfg.Port),
|
||||
Target: fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort),
|
||||
PathPrefix: "/",
|
||||
}
|
||||
if cfg.CertFile != "" || cfg.KeyFile != "" {
|
||||
tlsCert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
config.TLSConfig = &tls.Config{
|
||||
Certificates: []tls.Certificate{tlsCert},
|
||||
}
|
||||
}
|
||||
mp, err := mproxyhttp.NewProxy(config, sessionHandler, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
http.HandleFunc("/", mp.Handler)
|
||||
http.HandleFunc("/", mp.ServeHTTP)
|
||||
|
||||
errCh := make(chan error)
|
||||
switch {
|
||||
case cfg.CertFile != "" || cfg.KeyFile != "":
|
||||
go func() {
|
||||
errCh <- mp.ListenTLS(cfg.CertFile, cfg.KeyFile)
|
||||
errCh <- mp.Listen(ctx)
|
||||
}()
|
||||
logger.Info(fmt.Sprintf("%s service https server listening at %s:%s with TLS cert %s and key %s", svcName, cfg.Host, cfg.Port, cfg.CertFile, cfg.KeyFile))
|
||||
default:
|
||||
go func() {
|
||||
errCh <- mp.Listen()
|
||||
errCh <- mp.Listen(ctx)
|
||||
}()
|
||||
logger.Info(fmt.Sprintf("%s service http server listening at %s:%s without TLS", svcName, cfg.Host, cfg.Port))
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(fmt.Sprintf("proxy HTTP shutdown at %s", target))
|
||||
logger.Info(fmt.Sprintf("proxy HTTP shutdown at %s", config.Target))
|
||||
return nil
|
||||
case err := <-errCh:
|
||||
return err
|
||||
|
||||
+18
-10
@@ -32,7 +32,8 @@ import (
|
||||
"github.com/absmach/magistrala/pkg/messaging/handler"
|
||||
mqttpub "github.com/absmach/magistrala/pkg/messaging/mqtt"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
mp "github.com/absmach/mproxy/pkg/mqtt"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxymqtt "github.com/absmach/mproxy/pkg/mqtt"
|
||||
"github.com/absmach/mproxy/pkg/mqtt/websocket"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/caarlos0/env/v10"
|
||||
@@ -209,9 +210,11 @@ func main() {
|
||||
}
|
||||
|
||||
func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
|
||||
address := fmt.Sprintf(":%s", cfg.MQTTPort)
|
||||
target := fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort)
|
||||
mproxy := mp.New(address, target, sessionHandler, interceptor, logger)
|
||||
config := mproxy.Config{
|
||||
Address: fmt.Sprintf(":%s", cfg.MQTTPort),
|
||||
Target: fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort),
|
||||
}
|
||||
mproxy := mproxymqtt.New(config, sessionHandler, interceptor, logger)
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
@@ -220,7 +223,7 @@ func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHand
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(fmt.Sprintf("proxy MQTT shutdown at %s", target))
|
||||
logger.Info(fmt.Sprintf("proxy MQTT shutdown at %s", config.Target))
|
||||
return nil
|
||||
case err := <-errCh:
|
||||
return err
|
||||
@@ -228,19 +231,24 @@ func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHand
|
||||
}
|
||||
|
||||
func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
|
||||
target := fmt.Sprintf("%s:%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort)
|
||||
wp := websocket.New(target, cfg.HTTPTargetPath, "ws", sessionHandler, interceptor, logger)
|
||||
http.Handle("/mqtt", wp.Handler())
|
||||
config := mproxy.Config{
|
||||
Address: fmt.Sprintf("%s:%s", "", cfg.HTTPPort),
|
||||
Target: fmt.Sprintf("%s:%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort),
|
||||
PathPrefix: "/mqtt",
|
||||
}
|
||||
|
||||
wp := websocket.New(config, sessionHandler, interceptor, logger)
|
||||
http.HandleFunc("/mqtt", wp.ServeHTTP)
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
go func() {
|
||||
errCh <- wp.Listen(cfg.HTTPPort)
|
||||
errCh <- wp.Listen(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Info(fmt.Sprintf("proxy MQTT WS shutdown at %s", target))
|
||||
logger.Info(fmt.Sprintf("proxy MQTT WS shutdown at %s", config.Target))
|
||||
return nil
|
||||
case err := <-errCh:
|
||||
return err
|
||||
|
||||
@@ -5,7 +5,7 @@ go 1.22.2
|
||||
require (
|
||||
github.com/0x6flab/namegenerator v1.3.1
|
||||
github.com/absmach/callhome v0.14.0
|
||||
github.com/absmach/mproxy v0.4.2
|
||||
github.com/absmach/mproxy v0.4.3-0.20240430090627-27dad4c91c6c
|
||||
github.com/absmach/senml v1.0.5
|
||||
github.com/authzed/authzed-go v0.11.1
|
||||
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
|
||||
@@ -72,6 +72,7 @@ require (
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bitly/go-hostpool v0.1.0 // indirect
|
||||
github.com/caarlos0/env/v11 v11.0.0 // indirect
|
||||
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
|
||||
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
|
||||
@@ -21,8 +21,8 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd
|
||||
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
|
||||
github.com/absmach/callhome v0.14.0 h1:zB4tIZJ1YUmZ1VGHFPfMA/Lo6/Mv19y2dvoOiXj2BWs=
|
||||
github.com/absmach/callhome v0.14.0/go.mod h1:l12UJOfibK4Muvg/AbupHuquNV9qSz/ROdTEPg7f2Vk=
|
||||
github.com/absmach/mproxy v0.4.2 h1:u0ORPxSrUknqbVrC+E1MdsCv/7Q5eWNG7clIwOMV5hk=
|
||||
github.com/absmach/mproxy v0.4.2/go.mod h1:TeXhbHdjihXLVoohSzxvIEFzWu16WDOa91LNduks/N8=
|
||||
github.com/absmach/mproxy v0.4.3-0.20240430090627-27dad4c91c6c h1:wGtfVk3knDUsrUoyOxfyDPK3lJB6Yc6BMePf62UaTOo=
|
||||
github.com/absmach/mproxy v0.4.3-0.20240430090627-27dad4c91c6c/go.mod h1:Nevip6o8u5Zx7l3LTtN8BwlCI5h5KpsnI9YnAxF5RT8=
|
||||
github.com/absmach/senml v1.0.5 h1:zNPRYpGr2Wsb8brAusz8DIfFqemy1a2dNbmMnegY3GE=
|
||||
github.com/absmach/senml v1.0.5/go.mod h1:NDEjk3O4V4YYu9Bs2/+t/AZ/F+0wu05ikgecp+/FsSU=
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
|
||||
@@ -44,6 +44,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA=
|
||||
github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18=
|
||||
github.com/caarlos0/env/v11 v11.0.0 h1:ZIlkOjuL3xoZS0kmUJlF74j2Qj8GMOq3CDLX/Viak8Q=
|
||||
github.com/caarlos0/env/v11 v11.0.0/go.mod h1:2RC3HQu8BQqtEK3V4iHPxj0jOdWdbPpWJ6pOueeU1xM=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
||||
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
|
||||
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
|
||||
|
||||
@@ -6,11 +6,18 @@ package api
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/go-kit/kit/endpoint"
|
||||
)
|
||||
|
||||
func sendMessageEndpoint() endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
return nil, nil
|
||||
req := request.(publishReq)
|
||||
if err := req.validate(); err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
|
||||
return publishMessageRes{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,8 @@ import (
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
pubsub "github.com/absmach/magistrala/pkg/messaging/mocks"
|
||||
mproxy "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxyhttp "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
@@ -32,16 +33,20 @@ func newService(auth magistrala.AuthzServiceClient) (session.Handler, *pubsub.Pu
|
||||
}
|
||||
|
||||
func newTargetHTTPServer() *httptest.Server {
|
||||
mux := api.MakeHandler(instanceID)
|
||||
mux := api.MakeHandler(mglog.NewMock(), instanceID)
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
func newProxyHTPPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) {
|
||||
mp, err := mproxy.NewProxy("", targetServer.URL, svc, mglog.NewMock())
|
||||
config := mproxy.Config{
|
||||
Address: "",
|
||||
Target: targetServer.URL,
|
||||
}
|
||||
mp, err := mproxyhttp.NewProxy(config, svc, mglog.NewMock())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return httptest.NewServer(http.HandlerFunc(mp.Handler)), nil
|
||||
return httptest.NewServer(http.HandlerFunc(mp.ServeHTTP)), nil
|
||||
}
|
||||
|
||||
type testRequest struct {
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
)
|
||||
|
||||
type publishReq struct {
|
||||
msg *messaging.Message
|
||||
token string
|
||||
}
|
||||
|
||||
func (req publishReq) validate() error {
|
||||
if req.token == "" {
|
||||
return apiutil.ErrBearerKey
|
||||
}
|
||||
if len(req.msg.Payload) == 0 {
|
||||
return apiutil.ErrEmptyMessage
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
)
|
||||
|
||||
var _ magistrala.Response = (*publishMessageRes)(nil)
|
||||
|
||||
type publishMessageRes struct{}
|
||||
|
||||
func (res publishMessageRes) Code() int {
|
||||
return http.StatusAccepted
|
||||
}
|
||||
|
||||
func (res publishMessageRes) Headers() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res publishMessageRes) Empty() bool {
|
||||
return true
|
||||
}
|
||||
+21
-62
@@ -5,19 +5,19 @@ package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/internal/api"
|
||||
"github.com/absmach/magistrala/internal/apiutil"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/go-chi/chi/v5"
|
||||
kithttp "github.com/go-kit/kit/transport/http"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -26,26 +26,24 @@ const (
|
||||
contentType = "application/json"
|
||||
)
|
||||
|
||||
var errMalformedSubtopic = errors.New("malformed subtopic")
|
||||
|
||||
// MakeHandler returns a HTTP handler for API endpoints.
|
||||
func MakeHandler(instanceID string) http.Handler {
|
||||
func MakeHandler(logger *slog.Logger, instanceID string) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(encodeError),
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)),
|
||||
}
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Post("/channels/{chanID}/messages", otelhttp.NewHandler(kithttp.NewServer(
|
||||
sendMessageEndpoint(),
|
||||
decodeRequest,
|
||||
encodeResponse,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "publish").ServeHTTP)
|
||||
|
||||
r.Post("/channels/{chanID}/messages/*", otelhttp.NewHandler(kithttp.NewServer(
|
||||
sendMessageEndpoint(),
|
||||
decodeRequest,
|
||||
encodeResponse,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "publish").ServeHTTP)
|
||||
|
||||
@@ -61,62 +59,23 @@ func decodeRequest(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func encodeResponse(_ context.Context, w http.ResponseWriter, _ interface{}) error {
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
return nil
|
||||
}
|
||||
|
||||
func encodeError(_ context.Context, err error, w http.ResponseWriter) {
|
||||
var wrapper error
|
||||
if errors.Contains(err, apiutil.ErrValidation) {
|
||||
wrapper, err = errors.Unwrap(err)
|
||||
}
|
||||
var req publishReq
|
||||
|
||||
_, pass, ok := r.BasicAuth()
|
||||
switch {
|
||||
case errors.Contains(err, svcerr.ErrAuthentication),
|
||||
errors.Contains(err, apiutil.ErrBearerKey),
|
||||
errors.Contains(err, apiutil.ErrBearerToken):
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
case errors.Contains(err, svcerr.ErrAuthorization):
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
case errors.Contains(err, apiutil.ErrUnsupportedContentType):
|
||||
w.WriteHeader(http.StatusUnsupportedMediaType)
|
||||
case errors.Contains(err, errMalformedSubtopic),
|
||||
errors.Contains(err, svcerr.ErrMalformedEntity):
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
|
||||
default:
|
||||
switch e, ok := status.FromError(err); {
|
||||
case ok:
|
||||
switch e.Code() {
|
||||
case codes.Unauthenticated:
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
case codes.PermissionDenied:
|
||||
w.WriteHeader(http.StatusForbidden)
|
||||
case codes.Internal:
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
case codes.NotFound:
|
||||
err = svcerr.ErrNotFound
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
default:
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
default:
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
case ok:
|
||||
req.token = pass
|
||||
case !ok:
|
||||
req.token = apiutil.ExtractThingKey(r)
|
||||
}
|
||||
|
||||
if wrapper != nil {
|
||||
err = errors.Wrap(wrapper, err)
|
||||
payload, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, errors.ErrMalformedEntity)
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
if errorVal, ok := err.(errors.Error); ok {
|
||||
w.Header().Set("Content-Type", contentType)
|
||||
if err := json.NewEncoder(w).Encode(errorVal); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
req.msg = &messaging.Message{Payload: payload}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
@@ -152,7 +152,8 @@ func EncodeError(_ context.Context, err error, w http.ResponseWriter) {
|
||||
errors.Contains(err, apiutil.ErrInvalidContact),
|
||||
errors.Contains(err, apiutil.ErrInvalidTopic),
|
||||
errors.Contains(err, bootstrap.ErrAddBootstrap),
|
||||
errors.Contains(err, apiutil.ErrInvalidCertData):
|
||||
errors.Contains(err, apiutil.ErrInvalidCertData),
|
||||
errors.Contains(err, apiutil.ErrEmptyMessage):
|
||||
err = unwrap(err)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
|
||||
|
||||
@@ -173,4 +173,7 @@ var (
|
||||
|
||||
// ErrMissingTo indicates missing to value.
|
||||
ErrMissingTo = errors.New("missing to time value")
|
||||
|
||||
// ErrEmptyMessage indicates empty message.
|
||||
ErrEmptyMessage = errors.New("empty message")
|
||||
)
|
||||
|
||||
@@ -22,50 +22,50 @@ type loggingMiddleware struct {
|
||||
|
||||
// AuthConnect implements session.Handler.
|
||||
func (lm *loggingMiddleware) AuthConnect(ctx context.Context) (err error) {
|
||||
defer lm.logAction(ctx, "AuthConnect", nil, time.Now(), err)
|
||||
defer lm.logAction("AuthConnect", nil, time.Now(), err)
|
||||
return lm.svc.AuthConnect(ctx)
|
||||
}
|
||||
|
||||
// AuthPublish implements session.Handler.
|
||||
func (lm *loggingMiddleware) AuthPublish(ctx context.Context, topic *string, payload *[]byte) (err error) {
|
||||
defer lm.logAction(ctx, "AuthPublish", &[]string{*topic}, time.Now(), err)
|
||||
defer lm.logAction("AuthPublish", &[]string{*topic}, time.Now(), err)
|
||||
return lm.svc.AuthPublish(ctx, topic, payload)
|
||||
}
|
||||
|
||||
// AuthSubscribe implements session.Handler.
|
||||
func (lm *loggingMiddleware) AuthSubscribe(ctx context.Context, topics *[]string) (err error) {
|
||||
defer lm.logAction(ctx, "AuthSubscribe", topics, time.Now(), err)
|
||||
defer lm.logAction("AuthSubscribe", topics, time.Now(), err)
|
||||
return lm.svc.AuthSubscribe(ctx, topics)
|
||||
}
|
||||
|
||||
// Connect implements session.Handler.
|
||||
func (lm *loggingMiddleware) Connect(ctx context.Context) (err error) {
|
||||
defer lm.logAction(ctx, "Connect", nil, time.Now(), err)
|
||||
defer lm.logAction("Connect", nil, time.Now(), err)
|
||||
return lm.svc.Connect(ctx)
|
||||
}
|
||||
|
||||
// Disconnect implements session.Handler.
|
||||
func (lm *loggingMiddleware) Disconnect(ctx context.Context) (err error) {
|
||||
defer lm.logAction(ctx, "Disconnect", nil, time.Now(), err)
|
||||
defer lm.logAction("Disconnect", nil, time.Now(), err)
|
||||
return lm.svc.Disconnect(ctx)
|
||||
}
|
||||
|
||||
// Publish logs the publish request. It logs the time it took to complete the request.
|
||||
// If the request fails, it logs the error.
|
||||
func (lm *loggingMiddleware) Publish(ctx context.Context, topic *string, payload *[]byte) (err error) {
|
||||
defer lm.logAction(ctx, "Publish", &[]string{*topic}, time.Now(), err)
|
||||
defer lm.logAction("Publish", &[]string{*topic}, time.Now(), err)
|
||||
return lm.svc.Publish(ctx, topic, payload)
|
||||
}
|
||||
|
||||
// Subscribe implements session.Handler.
|
||||
func (lm *loggingMiddleware) Subscribe(ctx context.Context, topics *[]string) (err error) {
|
||||
defer lm.logAction(ctx, "Subscribe", topics, time.Now(), err)
|
||||
defer lm.logAction("Subscribe", topics, time.Now(), err)
|
||||
return lm.svc.Subscribe(ctx, topics)
|
||||
}
|
||||
|
||||
// Unsubscribe implements session.Handler.
|
||||
func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, topics *[]string) (err error) {
|
||||
defer lm.logAction(ctx, "Unsubscribe", topics, time.Now(), err)
|
||||
defer lm.logAction("Unsubscribe", topics, time.Now(), err)
|
||||
return lm.svc.Unsubscribe(ctx, topics)
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ func LoggingMiddleware(svc session.Handler, logger *slog.Logger) session.Handler
|
||||
return &loggingMiddleware{logger, svc}
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) logAction(ctx context.Context, action string, topics *[]string, t time.Time, err error) {
|
||||
func (lm *loggingMiddleware) logAction(action string, topics *[]string, t time.Time, err error) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(t).String()),
|
||||
}
|
||||
|
||||
@@ -19,7 +19,8 @@ import (
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
pubsub "github.com/absmach/magistrala/pkg/messaging/mocks"
|
||||
sdk "github.com/absmach/magistrala/pkg/sdk/go"
|
||||
mproxy "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxyhttp "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
@@ -29,15 +30,19 @@ func setupMessages() (*httptest.Server, *authmocks.AuthClient, *pubsub.PubSub) {
|
||||
pub := new(pubsub.PubSub)
|
||||
handler := adapter.NewHandler(pub, mglog.NewMock(), auth)
|
||||
|
||||
mux := api.MakeHandler("")
|
||||
mux := api.MakeHandler(mglog.NewMock(), "")
|
||||
target := httptest.NewServer(mux)
|
||||
|
||||
mp, err := mproxy.NewProxy("", target.URL, handler, mglog.NewMock())
|
||||
config := mproxy.Config{
|
||||
Address: "",
|
||||
Target: target.URL,
|
||||
}
|
||||
mp, err := mproxyhttp.NewProxy(config, handler, mglog.NewMock())
|
||||
if err != nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
return httptest.NewServer(http.HandlerFunc(mp.Handler)), auth, pub
|
||||
return httptest.NewServer(http.HandlerFunc(mp.ServeHTTP)), auth, pub
|
||||
}
|
||||
|
||||
func TestSendMessage(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user