mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:10:19 +00:00
NOISSUE - Switch from mProxy to mGate (#2500)
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
+5
-5
@@ -29,9 +29,9 @@ import (
|
||||
"github.com/absmach/magistrala/pkg/server"
|
||||
httpserver "github.com/absmach/magistrala/pkg/server/http"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxyhttp "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate"
|
||||
mgatehttp "github.com/absmach/mgate/pkg/http"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"github.com/caarlos0/env/v11"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -163,7 +163,7 @@ func newService(pub messaging.Publisher, tc magistrala.ThingsServiceClient, logg
|
||||
}
|
||||
|
||||
func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sessionHandler session.Handler) error {
|
||||
config := mproxy.Config{
|
||||
config := mgate.Config{
|
||||
Address: fmt.Sprintf("%s:%s", "", cfg.Port),
|
||||
Target: fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort),
|
||||
PathPrefix: "/",
|
||||
@@ -177,7 +177,7 @@ func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sess
|
||||
Certificates: []tls.Certificate{tlsCert},
|
||||
}
|
||||
}
|
||||
mp, err := mproxyhttp.NewProxy(config, sessionHandler, logger)
|
||||
mp, err := mgatehttp.NewProxy(config, sessionHandler, logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
+7
-7
@@ -32,10 +32,10 @@ import (
|
||||
mqttpub "github.com/absmach/magistrala/pkg/messaging/mqtt"
|
||||
"github.com/absmach/magistrala/pkg/server"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxymqtt "github.com/absmach/mproxy/pkg/mqtt"
|
||||
"github.com/absmach/mproxy/pkg/mqtt/websocket"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
mgate "github.com/absmach/mgate"
|
||||
mgatemqtt "github.com/absmach/mgate/pkg/mqtt"
|
||||
"github.com/absmach/mgate/pkg/mqtt/websocket"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"github.com/caarlos0/env/v11"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@@ -211,11 +211,11 @@ func main() {
|
||||
}
|
||||
|
||||
func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
|
||||
config := mproxy.Config{
|
||||
config := mgate.Config{
|
||||
Address: fmt.Sprintf(":%s", cfg.MQTTPort),
|
||||
Target: fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort),
|
||||
}
|
||||
mproxy := mproxymqtt.New(config, sessionHandler, interceptor, logger)
|
||||
mproxy := mgatemqtt.New(config, sessionHandler, interceptor, logger)
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
@@ -232,7 +232,7 @@ func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHand
|
||||
}
|
||||
|
||||
func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
|
||||
config := mproxy.Config{
|
||||
config := mgate.Config{
|
||||
Address: fmt.Sprintf("%s:%s", "", cfg.HTTPPort),
|
||||
Target: fmt.Sprintf("ws://%s:%s%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort, wsPathPrefix),
|
||||
PathPrefix: wsPathPrefix,
|
||||
|
||||
+2
-2
@@ -27,8 +27,8 @@ import (
|
||||
"github.com/absmach/magistrala/ws"
|
||||
"github.com/absmach/magistrala/ws/api"
|
||||
"github.com/absmach/magistrala/ws/tracing"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mproxy/pkg/websockets"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/websockets"
|
||||
"github.com/caarlos0/env/v11"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
@@ -8,7 +8,7 @@ require (
|
||||
github.com/0x6flab/namegenerator v1.4.0
|
||||
github.com/absmach/callhome v0.14.0
|
||||
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054
|
||||
github.com/absmach/mproxy v0.4.3-0.20240712131952-28f88581126a
|
||||
github.com/absmach/mgate v0.4.5
|
||||
github.com/absmach/senml v1.0.5
|
||||
github.com/authzed/authzed-go v1.1.0
|
||||
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
|
||||
|
||||
@@ -21,8 +21,8 @@ github.com/absmach/callhome v0.14.0 h1:zB4tIZJ1YUmZ1VGHFPfMA/Lo6/Mv19y2dvoOiXj2B
|
||||
github.com/absmach/callhome v0.14.0/go.mod h1:l12UJOfibK4Muvg/AbupHuquNV9qSz/ROdTEPg7f2Vk=
|
||||
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054 h1:NsIwp+ueKxDx8XftruA4hz8WUgyWq7eBE344nJt0LJg=
|
||||
github.com/absmach/certs v0.0.0-20241014135535-3f118b801054/go.mod h1:bEAb/HjPztlrMmz8dLeJTke4Tzu9yW3+hY5eldEUtSY=
|
||||
github.com/absmach/mproxy v0.4.3-0.20240712131952-28f88581126a h1:3JtJSekVHb02U3NmIJa5f3a1I15aczKrBBCczGGCbxM=
|
||||
github.com/absmach/mproxy v0.4.3-0.20240712131952-28f88581126a/go.mod h1:Nevip6o8u5Zx7l3LTtN8BwlCI5h5KpsnI9YnAxF5RT8=
|
||||
github.com/absmach/mgate v0.4.5 h1:l6RmrEsR9jxkdb9WHUSecmT0HA41TkZZQVffFfUAIfI=
|
||||
github.com/absmach/mgate v0.4.5/go.mod h1:IvRIHZexZPEIAPmmaJF0L5DY2ERjj+GxRGitOW4s6qo=
|
||||
github.com/absmach/senml v1.0.5 h1:zNPRYpGr2Wsb8brAusz8DIfFqemy1a2dNbmMnegY3GE=
|
||||
github.com/absmach/senml v1.0.5/go.mod h1:NDEjk3O4V4YYu9Bs2/+t/AZ/F+0wu05ikgecp+/FsSU=
|
||||
github.com/authzed/authzed-go v1.1.0 h1:aFy5mIwe9HzaRss0KmDXBhwAAN2LWIEoRNcPXTaLv8Y=
|
||||
|
||||
@@ -18,9 +18,9 @@ import (
|
||||
"github.com/absmach/magistrala/pkg/apiutil"
|
||||
pubsub "github.com/absmach/magistrala/pkg/messaging/mocks"
|
||||
thmocks "github.com/absmach/magistrala/things/mocks"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxyhttp "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate"
|
||||
proxy "github.com/absmach/mgate/pkg/http"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
@@ -41,11 +41,11 @@ func newTargetHTTPServer() *httptest.Server {
|
||||
}
|
||||
|
||||
func newProxyHTPPServer(svc session.Handler, targetServer *httptest.Server) (*httptest.Server, error) {
|
||||
config := mproxy.Config{
|
||||
config := mgate.Config{
|
||||
Address: "",
|
||||
Target: targetServer.URL,
|
||||
}
|
||||
mp, err := mproxyhttp.NewProxy(config, svc, mglog.NewMock())
|
||||
mp, err := proxy.NewProxy(config, svc, mglog.NewMock())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -151,7 +151,7 @@ func TestPublish(t *testing.T) {
|
||||
msg: msg,
|
||||
contentType: ctSenmlJSON,
|
||||
key: invalidKey,
|
||||
status: http.StatusBadRequest,
|
||||
status: http.StatusUnauthorized,
|
||||
},
|
||||
"publish message with invalid basic auth": {
|
||||
chanID: chanID,
|
||||
@@ -159,7 +159,7 @@ func TestPublish(t *testing.T) {
|
||||
contentType: ctSenmlJSON,
|
||||
key: invalidKey,
|
||||
basicAuth: true,
|
||||
status: http.StatusBadRequest,
|
||||
status: http.StatusUnauthorized,
|
||||
},
|
||||
"publish message without content type": {
|
||||
chanID: chanID,
|
||||
|
||||
+18
-25
@@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
@@ -18,7 +19,8 @@ import (
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/absmach/magistrala/pkg/policies"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
mgate "github.com/absmach/mgate/pkg/http"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
)
|
||||
|
||||
var _ session.Handler = (*handler)(nil)
|
||||
@@ -33,13 +35,13 @@ const (
|
||||
|
||||
// Error wrappers for MQTT errors.
|
||||
var (
|
||||
errMalformedSubtopic = errors.New("malformed subtopic")
|
||||
errClientNotInitialized = errors.New("client is not initialized")
|
||||
errMalformedTopic = errors.New("malformed topic")
|
||||
errMissingTopicPub = errors.New("failed to publish due to missing topic")
|
||||
errFailedPublish = errors.New("failed to publish")
|
||||
errFailedParseSubtopic = errors.New("failed to parse subtopic")
|
||||
errFailedPublishToMsgBroker = errors.New("failed to publish to magistrala message broker")
|
||||
errMalformedSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed subtopic"))
|
||||
errMalformedTopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("malformed topic"))
|
||||
errMissingTopicPub = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to publish due to missing topic"))
|
||||
errFailedParseSubtopic = mgate.NewHTTPProxyError(http.StatusBadRequest, errors.New("failed to parse subtopic"))
|
||||
)
|
||||
|
||||
var channelRegExp = regexp.MustCompile(`^\/?channels\/([\w\-]+)\/messages(\/[^?]*)?(\?.*)?$`)
|
||||
@@ -71,9 +73,9 @@ func (h *handler) AuthConnect(ctx context.Context) error {
|
||||
var tok string
|
||||
switch {
|
||||
case string(s.Password) == "":
|
||||
return errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey)
|
||||
case strings.HasPrefix(string(s.Password), "Thing"):
|
||||
tok = extractThingKey(string(s.Password))
|
||||
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey))
|
||||
case strings.HasPrefix(string(s.Password), apiutil.ThingPrefix):
|
||||
tok = strings.TrimPrefix(string(s.Password), apiutil.ThingPrefix)
|
||||
default:
|
||||
tok = string(s.Password)
|
||||
}
|
||||
@@ -113,7 +115,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
|
||||
channelParts := channelRegExp.FindStringSubmatch(*topic)
|
||||
if len(channelParts) < 2 {
|
||||
return errors.Wrap(errFailedPublish, errMalformedTopic)
|
||||
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedPublish, errMalformedTopic))
|
||||
}
|
||||
|
||||
chanID := channelParts[1]
|
||||
@@ -121,7 +123,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
|
||||
subtopic, err := parseSubtopic(subtopic)
|
||||
if err != nil {
|
||||
return errors.Wrap(errFailedParseSubtopic, err)
|
||||
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedParseSubtopic, err))
|
||||
}
|
||||
|
||||
msg := messaging.Message{
|
||||
@@ -135,8 +137,8 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
switch {
|
||||
case string(s.Password) == "":
|
||||
return errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerKey)
|
||||
case strings.HasPrefix(string(s.Password), "Thing"):
|
||||
tok = extractThingKey(string(s.Password))
|
||||
case strings.HasPrefix(string(s.Password), apiutil.ThingPrefix):
|
||||
tok = strings.TrimPrefix(string(s.Password), apiutil.ThingPrefix)
|
||||
default:
|
||||
tok = string(s.Password)
|
||||
}
|
||||
@@ -147,10 +149,10 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
|
||||
}
|
||||
res, err := h.things.Authorize(ctx, ar)
|
||||
if err != nil {
|
||||
return err
|
||||
return mgate.NewHTTPProxyError(http.StatusBadRequest, err)
|
||||
}
|
||||
if !res.GetAuthorized() {
|
||||
return svcerr.ErrAuthorization
|
||||
return mgate.NewHTTPProxyError(http.StatusUnauthorized, svcerr.ErrAuthorization)
|
||||
}
|
||||
msg.Publisher = res.GetId()
|
||||
|
||||
@@ -183,7 +185,7 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
|
||||
subtopic, err := url.QueryUnescape(subtopic)
|
||||
if err != nil {
|
||||
return "", errMalformedSubtopic
|
||||
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
|
||||
}
|
||||
subtopic = strings.ReplaceAll(subtopic, "/", ".")
|
||||
|
||||
@@ -195,7 +197,7 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
}
|
||||
|
||||
if len(elem) > 1 && (strings.Contains(elem, "*") || strings.Contains(elem, ">")) {
|
||||
return "", errMalformedSubtopic
|
||||
return "", mgate.NewHTTPProxyError(http.StatusBadRequest, errMalformedSubtopic)
|
||||
}
|
||||
|
||||
filteredElems = append(filteredElems, elem)
|
||||
@@ -204,12 +206,3 @@ func parseSubtopic(subtopic string) (string, error) {
|
||||
subtopic = strings.Join(filteredElems, ".")
|
||||
return subtopic, nil
|
||||
}
|
||||
|
||||
// extractThingKey returns value of the thing key. If there is no thing key - an empty value is returned.
|
||||
func extractThingKey(topic string) string {
|
||||
if !strings.HasPrefix(topic, apiutil.ThingPrefix) {
|
||||
return ""
|
||||
}
|
||||
|
||||
return strings.TrimPrefix(topic, apiutil.ThingPrefix)
|
||||
}
|
||||
|
||||
+1
-1
@@ -18,7 +18,7 @@ import (
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/absmach/magistrala/pkg/policies"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
)
|
||||
|
||||
var _ session.Handler = (*handler)(nil)
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
thmocks "github.com/absmach/magistrala/things/mocks"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
)
|
||||
|
||||
var _ session.Handler = (*loggingMiddleware)(nil)
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
)
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ package handler
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
@@ -24,8 +24,8 @@ import (
|
||||
readersapi "github.com/absmach/magistrala/readers/api"
|
||||
readersmocks "github.com/absmach/magistrala/readers/mocks"
|
||||
thmocks "github.com/absmach/magistrala/things/mocks"
|
||||
"github.com/absmach/mproxy"
|
||||
mproxyhttp "github.com/absmach/mproxy/pkg/http"
|
||||
"github.com/absmach/mgate"
|
||||
proxy "github.com/absmach/mgate/pkg/http"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
@@ -38,11 +38,11 @@ func setupMessages() (*httptest.Server, *thmocks.ThingsServiceClient, *pubsub.Pu
|
||||
mux := api.MakeHandler(mglog.NewMock(), "")
|
||||
target := httptest.NewServer(mux)
|
||||
|
||||
config := mproxy.Config{
|
||||
config := mgate.Config{
|
||||
Address: "",
|
||||
Target: target.URL,
|
||||
}
|
||||
mp, err := mproxyhttp.NewProxy(config, handler, mglog.NewMock())
|
||||
mp, err := proxy.NewProxy(config, handler, mglog.NewMock())
|
||||
if err != nil {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
@@ -18,8 +18,8 @@ import (
|
||||
thmocks "github.com/absmach/magistrala/things/mocks"
|
||||
"github.com/absmach/magistrala/ws"
|
||||
"github.com/absmach/magistrala/ws/api"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mproxy/pkg/websockets"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/websockets"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
+1
-1
@@ -17,7 +17,7 @@ import (
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
"github.com/absmach/magistrala/pkg/policies"
|
||||
"github.com/absmach/mproxy/pkg/session"
|
||||
"github.com/absmach/mgate/pkg/session"
|
||||
)
|
||||
|
||||
var _ session.Handler = (*handler)(nil)
|
||||
|
||||
Reference in New Issue
Block a user