SMQ-2977 - Add cache for message topic caching (#3000)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-07-15 11:46:45 +03:00
committed by GitHub
parent d4dc7ffe35
commit 5669e8a76e
27 changed files with 930 additions and 484 deletions
+15 -2
View File
@@ -37,6 +37,7 @@ const (
svcName = "coap_adapter"
envPrefix = "SMQ_COAP_ADAPTER_"
envPrefixHTTP = "SMQ_COAP_ADAPTER_HTTP_"
envPrefixCache = "SMQ_COAP_CACHE_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
@@ -93,6 +94,13 @@ func main() {
return
}
cacheConfig := messaging.CacheConfig{}
if err := env.ParseWithOptions(&cacheConfig, env.Options{Prefix: envPrefixCache}); err != nil {
logger.Error(fmt.Sprintf("failed to load cache configuration : %s", err))
exitCode = 1
return
}
domsGrpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err))
@@ -182,8 +190,13 @@ func main() {
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(cfg.InstanceID), logger)
resolver := messaging.NewTopicResolver(channelsClient, domainsClient)
cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, httpapi.MakeCoAPHandler(svc, channelsClient, resolver, logger), logger)
parser, err := messaging.NewTopicParser(cacheConfig, channelsClient, domainsClient)
if err != nil {
logger.Error(fmt.Sprintf("failed to create topic parsers: %s", err))
exitCode = 1
return
}
cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, httpapi.MakeCoAPHandler(svc, channelsClient, parser, logger), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, supermq.Version, logger, cancel)
+21 -5
View File
@@ -47,6 +47,7 @@ import (
const (
svcName = "http_adapter"
envPrefix = "SMQ_HTTP_ADAPTER_"
envPrefixCache = "SMQ_HTTP_ADAPTER_CACHE_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
@@ -100,6 +101,13 @@ func main() {
return
}
cacheConfig := messaging.CacheConfig{}
if err := env.ParseWithOptions(&cacheConfig, env.Options{Prefix: envPrefixCache}); err != nil {
logger.Error(fmt.Sprintf("failed to load cache configuration : %s", err))
exitCode = 1
return
}
domsGrpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err))
@@ -193,7 +201,12 @@ func main() {
return
}
svc := newService(pub, authn, clientsClient, channelsClient, domainsClient, logger, tracer)
svc, err := newService(pub, authn, cacheConfig, clientsClient, channelsClient, domainsClient, logger, tracer)
if err != nil {
logger.Error(fmt.Sprintf("failed to create service: %s", err))
exitCode = 1
return
}
targetServerCfg := server.Config{Port: targetHTTPPort}
hs := httpserver.NewServer(ctx, cancel, svcName, targetServerCfg, httpapi.MakeHandler(logger, cfg.InstanceID), logger)
@@ -220,14 +233,17 @@ func main() {
}
}
func newService(pub messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient, logger *slog.Logger, tracer trace.Tracer) session.Handler {
resolver := messaging.NewTopicResolver(channels, domains)
svc := adapter.NewHandler(pub, authn, clients, channels, resolver, logger)
func newService(pub messaging.Publisher, authn smqauthn.Authentication, cacheCfg messaging.CacheConfig, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient, logger *slog.Logger, tracer trace.Tracer) (session.Handler, error) {
parser, err := messaging.NewTopicParser(cacheCfg, channels, domains)
if err != nil {
return nil, err
}
svc := adapter.NewHandler(pub, authn, clients, channels, parser, logger)
svc = handler.NewTracing(tracer, svc)
svc = handler.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
svc = handler.MetricsMiddleware(svc, counter, latency)
return svc
return svc, nil
}
func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sessionHandler session.Handler) error {
+15 -1
View File
@@ -47,6 +47,7 @@ import (
const (
svcName = "mqtt"
envPrefixCache = "SMQ_MQTT_ADAPTER_CACHE_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
@@ -221,7 +222,20 @@ func main() {
defer channelsHandler.Close()
logger.Info("Channels service gRPC client successfully connected to channels gRPC server " + channelsHandler.Secure())
h := mqtt.NewHandler(np, logger, clientsClient, channelsClient)
cacheConfig := messaging.CacheConfig{}
if err := env.ParseWithOptions(&cacheConfig, env.Options{Prefix: envPrefixCache}); err != nil {
logger.Error(fmt.Sprintf("failed to load cache configuration : %s", err))
exitCode = 1
return
}
parser, err := messaging.NewTopicParser(cacheConfig, channelsClient, domainsClient)
if err != nil {
logger.Error(fmt.Sprintf("failed to create topic parsers: %s", err))
exitCode = 1
return
}
h := mqtt.NewHandler(np, logger, clientsClient, channelsClient, parser)
h, err = events.NewEventStoreMiddleware(ctx, h, cfg.ESURL, cfg.Instance)
if err != nil {
+15 -1
View File
@@ -43,6 +43,7 @@ import (
const (
svcName = "ws-adapter"
envPrefixHTTP = "SMQ_WS_ADAPTER_HTTP_"
envPrefixCache = "SMQ_WS_ADAPTER_CACHE_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
@@ -195,6 +196,19 @@ func main() {
}
resolver := messaging.NewTopicResolver(channelsClient, domainsClient)
cacheConfig := messaging.CacheConfig{}
if err := env.ParseWithOptions(&cacheConfig, env.Options{Prefix: envPrefixCache}); err != nil {
logger.Error(fmt.Sprintf("failed to load cache configuration : %s", err))
exitCode = 1
return
}
parser, err := messaging.NewTopicParser(cacheConfig, channelsClient, domainsClient)
if err != nil {
logger.Error(fmt.Sprintf("failed to create topic parser: %s", err))
exitCode = 1
return
}
svc := newService(clientsClient, channelsClient, nps, logger, tracer)
hs := httpserver.NewServer(ctx, cancel, svcName, targetServerConfig, httpapi.MakeHandler(ctx, svc, resolver, logger, cfg.InstanceID), logger)
@@ -209,7 +223,7 @@ func main() {
})
g.Go(func() error {
handler := ws.NewHandler(nps, logger, authn, clientsClient, channelsClient, resolver)
handler := ws.NewHandler(nps, logger, authn, clientsClient, channelsClient, parser)
return proxyWS(ctx, httpServerConfig, targetServerConfig, logger, handler)
})
+27 -21
View File
@@ -6,27 +6,30 @@ SuperMQ CoAP adapter provides an [CoAP](http://coap.technology/) API for sending
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| --------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_COAP_ADAPTER_LOG_LEVEL | Log level for the CoAP Adapter (debug, info, warn, error) | info |
| SMQ_COAP_ADAPTER_HOST | CoAP service listening host | "" |
| SMQ_COAP_ADAPTER_PORT | CoAP service listening port | 5683 |
| SMQ_COAP_ADAPTER_SERVER_CERT | CoAP service server certificate | "" |
| SMQ_COAP_ADAPTER_SERVER_KEY | CoAP service server key | "" |
| SMQ_COAP_ADAPTER_HTTP_HOST | Service HTTP listening host | "" |
| SMQ_COAP_ADAPTER_HTTP_PORT | Service listening port | 5683 |
| SMQ_COAP_ADAPTER_HTTP_SERVER_CERT | Service server certificate | "" |
| SMQ_COAP_ADAPTER_HTTP_SERVER_KEY | Service server key | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to magistrala call home server | true |
| SMQ_COAP_ADAPTER_INSTANCE_ID | CoAP adapter instance ID | "" |
| Variable | Description | Default |
| ----------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_COAP_ADAPTER_LOG_LEVEL | Log level for the CoAP Adapter (debug, info, warn, error) | info |
| SMQ_COAP_ADAPTER_HOST | CoAP service listening host | "" |
| SMQ_COAP_ADAPTER_PORT | CoAP service listening port | 5683 |
| SMQ_COAP_ADAPTER_SERVER_CERT | CoAP service server certificate | "" |
| SMQ_COAP_ADAPTER_SERVER_KEY | CoAP service server key | "" |
| SMQ_COAP_ADAPTER_HTTP_HOST | Service HTTP listening host | "" |
| SMQ_COAP_ADAPTER_HTTP_PORT | Service listening port | 5683 |
| SMQ_COAP_ADAPTER_HTTP_SERVER_CERT | Service server certificate | "" |
| SMQ_COAP_ADAPTER_HTTP_SERVER_KEY | Service server key | "" |
| SMQ_COAP_ADAPTER_CACHE_NUM_COUNTERS | Number of cache counters to keep that hold access frequency information | 200000 |
| SMQ_COAP_ADAPTER_CACHE_MAX_COST | Maximum size of the cache(in bytes) | 1048576 |
| SMQ_COAP_ADAPTER_CACHE_BUFFER_ITEMS | Number of cache `Get` buffers | 64 |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to magistrala call home server | true |
| SMQ_COAP_ADAPTER_INSTANCE_ID | CoAP adapter instance ID | "" |
## Deployment
@@ -57,6 +60,9 @@ SMQ_COAP_ADAPTER_HTTP_HOST=localhost \
SMQ_COAP_ADAPTER_HTTP_PORT=5683 \
SMQ_COAP_ADAPTER_HTTP_SERVER_CERT="" \
SMQ_COAP_ADAPTER_HTTP_SERVER_KEY="" \
SMQ_COAP_ADAPTER_CACHE_NUM_COUNTERS=200000 \
SMQ_COAP_ADAPTER_CACHE_MAX_COST=1048576 \
SMQ_COAP_ADAPTER_CACHE_BUFFER_ITEMS=64 \
SMQ_CLIENTS_GRPC_URL=localhost:7000 \
SMQ_CLIENTS_GRPC_TIMEOUT=1s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
+6 -11
View File
@@ -50,16 +50,16 @@ type CoAPHandler struct {
logger *slog.Logger
service coap.Service
channels grpcChannelsV1.ChannelsServiceClient
resolver messaging.TopicResolver
parser messaging.TopicParser
}
// MakeCoAPHandler creates handler for CoAP messages.
func MakeCoAPHandler(svc coap.Service, channelsClient grpcChannelsV1.ChannelsServiceClient, resolver messaging.TopicResolver, l *slog.Logger) mux.Handler {
func MakeCoAPHandler(svc coap.Service, channelsClient grpcChannelsV1.ChannelsServiceClient, parser messaging.TopicParser, l *slog.Logger) mux.Handler {
return &CoAPHandler{
logger: l,
service: svc,
channels: channelsClient,
resolver: resolver,
parser: parser,
}
}
@@ -138,22 +138,17 @@ func (h *CoAPHandler) decodeMessage(msg *mux.Message) (*messaging.Message, error
return &messaging.Message{}, err
}
var domain, channel, subTopic string
var domainID, channelID, subTopic string
switch msg.Code() {
case codes.GET:
domain, channel, subTopic, err = messaging.ParseSubscribeTopic(path)
domainID, channelID, subTopic, err = h.parser.ParseSubscribeTopic(msg.Context(), path, true)
case codes.POST:
domain, channel, subTopic, err = messaging.ParsePublishTopic(path)
domainID, channelID, subTopic, err = h.parser.ParsePublishTopic(msg.Context(), path, true)
}
if err != nil {
return &messaging.Message{}, err
}
domainID, channelID, err := h.resolver.Resolve(msg.Context(), domain, channel)
if err != nil {
return &messaging.Message{}, err
}
ret := &messaging.Message{
Protocol: protocol,
Domain: domainID,
+12
View File
@@ -377,6 +377,9 @@ SMQ_HTTP_ADAPTER_HOST=http-adapter
SMQ_HTTP_ADAPTER_PORT=8008
SMQ_HTTP_ADAPTER_SERVER_CERT=
SMQ_HTTP_ADAPTER_SERVER_KEY=
SMQ_HTTP_ADAPTER_CACHE_NUM_COUNTERS=200000
SMQ_HTTP_ADAPTER_CACHE_MAX_COST=1048576
SMQ_HTTP_ADAPTER_CACHE_BUFFER_ITEMS=64
SMQ_HTTP_ADAPTER_INSTANCE_ID=
### MQTT
@@ -387,6 +390,9 @@ SMQ_MQTT_ADAPTER_WS_PORT=8080
SMQ_MQTT_ADAPTER_INSTANCE=
SMQ_MQTT_ADAPTER_INSTANCE_ID=
SMQ_MQTT_ADAPTER_ES_DB=0
SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS=200000
SMQ_MQTT_ADAPTER_CACHE_MAX_COST=1048576
SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS=64
### CoAP
SMQ_COAP_ADAPTER_LOG_LEVEL=debug
@@ -398,6 +404,9 @@ SMQ_COAP_ADAPTER_HTTP_HOST=coap-adapter
SMQ_COAP_ADAPTER_HTTP_PORT=5683
SMQ_COAP_ADAPTER_HTTP_SERVER_CERT=
SMQ_COAP_ADAPTER_HTTP_SERVER_KEY=
SMQ_COAP_ADAPTER_CACHE_NUM_COUNTERS=200000
SMQ_COAP_ADAPTER_CACHE_MAX_COST=1048576
SMQ_COAP_ADAPTER_CACHE_BUFFER_ITEMS=64
SMQ_COAP_ADAPTER_INSTANCE_ID=
### WS
@@ -406,6 +415,9 @@ SMQ_WS_ADAPTER_HTTP_HOST=ws-adapter
SMQ_WS_ADAPTER_HTTP_PORT=8186
SMQ_WS_ADAPTER_HTTP_SERVER_CERT=
SMQ_WS_ADAPTER_HTTP_SERVER_KEY=
SMQ_WS_ADAPTER_CACHE_NUM_COUNTERS=200000
SMQ_WS_ADAPTER_CACHE_MAX_COST=1048576
SMQ_WS_ADAPTER_CACHE_BUFFER_ITEMS=64
SMQ_WS_ADAPTER_INSTANCE_ID=
## Addons Services
+12
View File
@@ -976,6 +976,9 @@ services:
SMQ_MQTT_ADAPTER_WS_TARGET_PORT: ${SMQ_MQTT_ADAPTER_WS_TARGET_PORT}
SMQ_MQTT_ADAPTER_WS_TARGET_PATH: ${SMQ_MQTT_ADAPTER_WS_TARGET_PATH}
SMQ_MQTT_ADAPTER_INSTANCE: ${SMQ_MQTT_ADAPTER_INSTANCE}
SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS: ${SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS}
SMQ_MQTT_ADAPTER_CACHE_MAX_COST: ${SMQ_MQTT_ADAPTER_CACHE_MAX_COST}
SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS: ${SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS}
SMQ_ES_URL: ${SMQ_ES_URL}
SMQ_CLIENTS_GRPC_URL: ${SMQ_CLIENTS_GRPC_URL}
SMQ_CLIENTS_GRPC_TIMEOUT: ${SMQ_CLIENTS_GRPC_TIMEOUT}
@@ -1061,6 +1064,9 @@ services:
SMQ_HTTP_ADAPTER_PORT: ${SMQ_HTTP_ADAPTER_PORT}
SMQ_HTTP_ADAPTER_SERVER_CERT: ${SMQ_HTTP_ADAPTER_SERVER_CERT}
SMQ_HTTP_ADAPTER_SERVER_KEY: ${SMQ_HTTP_ADAPTER_SERVER_KEY}
SMQ_HTTP_ADAPTER_CACHE_NUM_COUNTERS: ${SMQ_HTTP_ADAPTER_CACHE_NUM_COUNTERS}
SMQ_HTTP_ADAPTER_CACHE_MAX_COST: ${SMQ_HTTP_ADAPTER_CACHE_MAX_COST}
SMQ_HTTP_ADAPTER_CACHE_BUFFER_ITEMS: ${SMQ_HTTP_ADAPTER_CACHE_BUFFER_ITEMS}
SMQ_CLIENTS_GRPC_URL: ${SMQ_CLIENTS_GRPC_URL}
SMQ_CLIENTS_GRPC_TIMEOUT: ${SMQ_CLIENTS_GRPC_TIMEOUT}
SMQ_CLIENTS_GRPC_CLIENT_CERT: ${SMQ_CLIENTS_GRPC_CLIENT_CERT:+/clients-grpc-client.crt}
@@ -1174,6 +1180,9 @@ services:
SMQ_COAP_ADAPTER_HTTP_PORT: ${SMQ_COAP_ADAPTER_HTTP_PORT}
SMQ_COAP_ADAPTER_HTTP_SERVER_CERT: ${SMQ_COAP_ADAPTER_HTTP_SERVER_CERT}
SMQ_COAP_ADAPTER_HTTP_SERVER_KEY: ${SMQ_COAP_ADAPTER_HTTP_SERVER_KEY}
SMQ_COAP_ADAPTER_CACHE_NUM_COUNTERS: ${SMQ_COAP_ADAPTER_CACHE_NUM_COUNTERS}
SMQ_COAP_ADAPTER_CACHE_MAX_COST: ${SMQ_COAP_ADAPTER_CACHE_MAX_COST}
SMQ_COAP_ADAPTER_CACHE_BUFFER_ITEMS: ${SMQ_COAP_ADAPTER_CACHE_BUFFER_ITEMS}
SMQ_CLIENTS_GRPC_URL: ${SMQ_CLIENTS_GRPC_URL}
SMQ_CLIENTS_GRPC_TIMEOUT: ${SMQ_CLIENTS_GRPC_TIMEOUT}
SMQ_CLIENTS_GRPC_CLIENT_CERT: ${SMQ_CLIENTS_GRPC_CLIENT_CERT:+/clients-grpc-client.crt}
@@ -1268,6 +1277,9 @@ services:
SMQ_WS_ADAPTER_HTTP_PORT: ${SMQ_WS_ADAPTER_HTTP_PORT}
SMQ_WS_ADAPTER_HTTP_SERVER_CERT: ${SMQ_WS_ADAPTER_HTTP_SERVER_CERT}
SMQ_WS_ADAPTER_HTTP_SERVER_KEY: ${SMQ_WS_ADAPTER_HTTP_SERVER_KEY}
SMQ_WS_ADAPTER_CACHE_NUM_COUNTERS: ${SMQ_WS_ADAPTER_CACHE_NUM_COUNTERS}
SMQ_WS_ADAPTER_CACHE_MAX_COST: ${SMQ_WS_ADAPTER_CACHE_MAX_COST}
SMQ_WS_ADAPTER_CACHE_BUFFER_ITEMS: ${SMQ_WS_ADAPTER_CACHE_BUFFER_ITEMS}
SMQ_CLIENTS_GRPC_URL: ${SMQ_CLIENTS_GRPC_URL}
SMQ_CLIENTS_GRPC_TIMEOUT: ${SMQ_CLIENTS_GRPC_TIMEOUT}
SMQ_CLIENTS_GRPC_CLIENT_CERT: ${SMQ_CLIENTS_GRPC_CLIENT_CERT:+/clients-grpc-client.crt}
+2
View File
@@ -13,6 +13,7 @@ require (
github.com/authzed/spicedb v1.45.1
github.com/caarlos0/env/v11 v11.3.1
github.com/cenkalti/backoff/v4 v4.3.0
github.com/dgraph-io/ristretto/v2 v2.2.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fatih/color v1.18.0
github.com/go-chi/chi/v5 v5.2.2
@@ -81,6 +82,7 @@ require (
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dsnet/golib/memfile v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
+6
View File
@@ -75,6 +75,10 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM=
github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/docker/cli v27.4.1+incompatible h1:VzPiUlRJ/xh+otB75gva3r05isHMo5wXDfPRi5/b4hI=
@@ -87,6 +91,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dsnet/golib/memfile v1.0.0 h1:J9pUspY2bDCbF9o+YGwcf3uG6MdyITfh/Fk3/CaEiFs=
github.com/dsnet/golib/memfile v1.0.0/go.mod h1:tXGNW9q3RwvWt1VV2qrRKlSSz0npnh12yftCSCy2T64=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
+23 -17
View File
@@ -6,23 +6,26 @@ HTTP adapter provides an HTTP API for sending messages through the platform.
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| ----------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter (debug, info, warn, error) | info |
| SMQ_HTTP_ADAPTER_HOST | Service HTTP host | "" |
| SMQ_HTTP_ADAPTER_PORT | Service HTTP port | 80 |
| SMQ_HTTP_ADAPTER_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_HTTP_ADAPTER_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_HTTP_ADAPTER_INSTANCE_ID | Service instance ID | "" |
| Variable | Description | Default |
| ----------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter (debug, info, warn, error) | info |
| SMQ_HTTP_ADAPTER_HOST | Service HTTP host | "" |
| SMQ_HTTP_ADAPTER_PORT | Service HTTP port | 80 |
| SMQ_HTTP_ADAPTER_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_HTTP_ADAPTER_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_HTTP_ADAPTER_CACHE_NUM_COUNTERS | Number of cache counters to keep that hold access frequency information | 200000 |
| SMQ_HTTP_ADAPTER_CACHE_MAX_COST | Maximum size of the cache(in bytes) | 1048576 |
| SMQ_HTTP_ADAPTER_CACHE_BUFFER_ITEMS | Number of cache `Get` buffers | 64 |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_HTTP_ADAPTER_INSTANCE_ID | Service instance ID | "" |
## Deployment
@@ -49,6 +52,9 @@ SMQ_HTTP_ADAPTER_HOST=localhost \
SMQ_HTTP_ADAPTER_PORT=80 \
SMQ_HTTP_ADAPTER_SERVER_CERT="" \
SMQ_HTTP_ADAPTER_SERVER_KEY="" \
SMQ_HTTP_ADAPTER_CACHE_NUM_COUNTERS=200000 \
SMQ_HTTP_ADAPTER_CACHE_MAX_COST=1048576 \
SMQ_HTTP_ADAPTER_CACHE_BUFFER_ITEMS=64 \
SMQ_CLIENTS_GRPC_URL=localhost:7000 \
SMQ_CLIENTS_GRPC_TIMEOUT=1s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
+9 -4
View File
@@ -49,10 +49,14 @@ var (
domainID = testsutil.GenerateUUID(&testing.T{})
)
func newService(authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient) (session.Handler, *pubsub.PubSub) {
func newService(authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient) (session.Handler, *pubsub.PubSub, error) {
pub := new(pubsub.PubSub)
resolver := messaging.NewTopicResolver(channels, domains)
return server.NewHandler(pub, authn, clients, channels, resolver, smqlog.NewMock()), pub
parser, err := messaging.NewTopicParser(messaging.DefaultCacheConfig, channels, domains)
if err != nil {
return nil, nil, err
}
return server.NewHandler(pub, authn, clients, channels, parser, smqlog.NewMock()), pub, nil
}
func newTargetHTTPServer() *httptest.Server {
@@ -120,7 +124,8 @@ func TestPublish(t *testing.T) {
msg := `[{"n":"current","t":-1,"v":1.6}]`
msgJSON := `{"field1":"val1","field2":"val2"}`
msgCBOR := `81A3616E6763757272656E746174206176FB3FF999999999999A`
svc, pub := newService(authn, clients, channels, domains)
svc, pub, err := newService(authn, clients, channels, domains)
assert.Nil(t, err, fmt.Sprintf("failed to create service with err: %v", err))
target := newTargetHTTPServer()
defer target.Close()
ts, err := newProxyHTPPServer(svc, target)
+4 -8
View File
@@ -56,19 +56,19 @@ type handler struct {
publisher messaging.Publisher
clients grpcClientsV1.ClientsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
resolver messaging.TopicResolver
parser messaging.TopicParser
authn smqauthn.Authentication
logger *slog.Logger
}
// NewHandler creates new Handler entity.
func NewHandler(publisher messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, resolver messaging.TopicResolver, logger *slog.Logger) session.Handler {
func NewHandler(publisher messaging.Publisher, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, parser messaging.TopicParser, logger *slog.Logger) session.Handler {
return &handler{
publisher: publisher,
authn: authn,
clients: clients,
channels: channels,
resolver: resolver,
parser: parser,
logger: logger,
}
}
@@ -121,14 +121,10 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
return errors.Wrap(errFailedPublish, errClientNotInitialized)
}
domain, channel, subtopic, err := messaging.ParsePublishTopic(*topic)
domainID, channelID, subtopic, err := h.parser.ParsePublishTopic(ctx, *topic, true)
if err != nil {
return errors.Wrap(errMalformedTopic, err)
}
domainID, channelID, err := h.resolver.Resolve(ctx, domain, channel)
if err != nil {
return errors.Wrap(errFailedPublish, err)
}
var clientID, clientType string
switch {
+6 -5
View File
@@ -70,19 +70,20 @@ var (
domains = new(dmocks.DomainsServiceClient)
)
func newHandler() session.Handler {
func newHandler(t *testing.T) session.Handler {
logger := smqlog.NewMock()
authn = new(authnmocks.Authentication)
clients = new(clmocks.ClientsServiceClient)
channels = new(chmocks.ChannelsServiceClient)
publisher = new(mocks.PubSub)
resolver := messaging.NewTopicResolver(channels, domains)
parser, err := messaging.NewTopicParser(messaging.DefaultCacheConfig, channels, domains)
assert.Nil(t, err, fmt.Sprintf("unexpected error while creating topic parser: %v", err))
return mhttp.NewHandler(publisher, authn, clients, channels, resolver, logger)
return mhttp.NewHandler(publisher, authn, clients, channels, parser, logger)
}
func TestAuthConnect(t *testing.T) {
handler := newHandler()
handler := newHandler(t)
cases := []struct {
desc string
@@ -136,7 +137,7 @@ func TestAuthConnect(t *testing.T) {
}
func TestPublish(t *testing.T) {
handler := newHandler()
handler := newHandler(t)
malformedSubtopics := topic + "/" + subtopic + "%"
+6
View File
@@ -19,6 +19,9 @@ The service is configured using the environment variables presented in the follo
| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost |
| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt |
| SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS | Number of cache counters to keep that hold access frequency information | 200000 |
| SMQ_MQTT_ADAPTER_CACHE_MAX_COST | Maximum size of the cache(in bytes) | 1048576 |
| SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS | Number of cache `Get` buffers | 64 |
| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
@@ -63,6 +66,9 @@ SMQ_MQTT_ADAPTER_WS_PORT=8080 \
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=localhost \
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 \
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt \
SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS=200000 \
SMQ_MQTT_ADAPTER_CACHE_MAX_COST=1048576 \
SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS=64 \
SMQ_MQTT_ADAPTER_INSTANCE="" \
SMQ_CLIENTS_GRPC_URL=localhost:7000 \
SMQ_CLIENTS_GRPC_TIMEOUT=1s \
+6 -4
View File
@@ -57,16 +57,18 @@ type handler struct {
publisher messaging.Publisher
clients grpcClientsV1.ClientsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
parser messaging.TopicParser
logger *slog.Logger
}
// NewHandler creates new Handler entity.
func NewHandler(publisher messaging.Publisher, logger *slog.Logger, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient) session.Handler {
func NewHandler(publisher messaging.Publisher, logger *slog.Logger, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, parser messaging.TopicParser) session.Handler {
return &handler{
logger: logger,
publisher: publisher,
clients: clients,
channels: channels,
parser: parser,
}
}
@@ -110,7 +112,7 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt
return ErrClientNotInitialized
}
domainID, chanID, _, err := messaging.ParsePublishTopic(*topic)
domainID, chanID, _, err := h.parser.ParsePublishTopic(ctx, *topic, false)
if err != nil {
return err
}
@@ -130,7 +132,7 @@ func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
}
for _, topic := range *topics {
domainID, chanID, _, err := messaging.ParseSubscribeTopic(topic)
domainID, chanID, _, err := h.parser.ParseSubscribeTopic(ctx, topic, false)
if err != nil {
return err
}
@@ -161,7 +163,7 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
}
h.logger.Info(fmt.Sprintf(LogInfoPublished, s.ID, *topic))
domainID, chanID, subTopic, err := messaging.ParsePublishTopic(*topic)
domainID, chanID, subTopic, err := h.parser.ParsePublishTopic(ctx, *topic, false)
if err != nil {
return errors.Wrap(ErrFailedPublish, err)
}
+7 -1
View File
@@ -15,6 +15,7 @@ import (
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
chmocks "github.com/absmach/supermq/channels/mocks"
climocks "github.com/absmach/supermq/clients/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
"github.com/absmach/supermq/internal/testsutil"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/mqtt"
@@ -534,6 +535,11 @@ func newHandler() session.Handler {
}
clients = new(climocks.ClientsServiceClient)
channels = new(chmocks.ChannelsServiceClient)
domains := new(dmocks.DomainsServiceClient)
parser, err := messaging.NewTopicParser(messaging.DefaultCacheConfig, channels, domains)
if err != nil {
log.Fatalf("failed to create topic parser: %s", err)
}
publisher = new(mocks.PubSub)
return mqtt.NewHandler(publisher, logger, clients, channels)
return mqtt.NewHandler(publisher, logger, clients, channels, parser)
}
-110
View File
@@ -1,110 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package messaging
import (
"context"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
"github.com/absmach/supermq/pkg/errors"
"github.com/gofrs/uuid/v5"
)
var (
ErrEmptyRouteID = errors.New("empty route or id")
ErrFailedResolveDomain = errors.New("failed to resolve domain route")
ErrFailedResolveChannel = errors.New("failed to resolve channel route")
)
// TopicResolver contains definitions for resolving domain and channel IDs
// from their respective routes from the message topic.
type TopicResolver interface {
Resolve(ctx context.Context, domain, channel string) (domainID string, channelID string, err error)
ResolveTopic(ctx context.Context, topic string) (rtopic string, err error)
}
type resolver struct {
channels grpcChannelsV1.ChannelsServiceClient
domains grpcDomainsV1.DomainsServiceClient
}
// NewTopicResolver creates a new instance of TopicResolver.
func NewTopicResolver(channelsClient grpcChannelsV1.ChannelsServiceClient, domainsClient grpcDomainsV1.DomainsServiceClient) TopicResolver {
return &resolver{
channels: channelsClient,
domains: domainsClient,
}
}
func (r *resolver) Resolve(ctx context.Context, domain, channel string) (string, string, error) {
if domain == "" || channel == "" {
return "", "", ErrEmptyRouteID
}
domainID, err := r.resolveDomain(ctx, domain)
if err != nil {
return "", "", errors.Wrap(ErrFailedResolveDomain, err)
}
channelID, err := r.resolveChannel(ctx, channel, domainID)
if err != nil {
return "", "", errors.Wrap(ErrFailedResolveChannel, err)
}
return domainID, channelID, nil
}
func (r *resolver) ResolveTopic(ctx context.Context, topic string) (string, error) {
domain, channel, subtopic, err := ParseTopic(topic)
if err != nil {
return "", errors.Wrap(ErrMalformedTopic, err)
}
domainID, channelID, err := r.Resolve(ctx, domain, channel)
if err != nil {
return "", err
}
rtopic := EncodeAdapterTopic(domainID, channelID, subtopic)
return rtopic, nil
}
func (r *resolver) resolveDomain(ctx context.Context, domain string) (string, error) {
if validateUUID(domain) == nil {
return domain, nil
}
d, err := r.domains.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: domain,
})
if err != nil {
return "", err
}
return d.Entity.Id, nil
}
func (r *resolver) resolveChannel(ctx context.Context, channel, domainID string) (string, error) {
if validateUUID(channel) == nil {
return channel, nil
}
c, err := r.channels.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: channel,
DomainId: domainID,
})
if err != nil {
return "", err
}
return c.Entity.Id, nil
}
func validateUUID(extID string) (err error) {
id, err := uuid.FromString(extID)
if id.String() != extID || err != nil {
return err
}
return nil
}
-248
View File
@@ -1,248 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package messaging_test
import (
"context"
"fmt"
"testing"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
chmocks "github.com/absmach/supermq/channels/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
"github.com/absmach/supermq/internal/testsutil"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
validRoute = "valid-route"
invalidRoute = "invalid-route"
channelID = testsutil.GenerateUUID(&testing.T{})
domainID = testsutil.GenerateUUID(&testing.T{})
topicFmt = "m/%s/c/%s"
)
func setupResolver() (messaging.TopicResolver, *dmocks.DomainsServiceClient, *chmocks.ChannelsServiceClient) {
channels := new(chmocks.ChannelsServiceClient)
domains := new(dmocks.DomainsServiceClient)
resolver := messaging.NewTopicResolver(channels, domains)
return resolver, domains, channels
}
func TestResolve(t *testing.T) {
resolver, domains, channels := setupResolver()
cases := []struct {
desc string
domain string
channel string
domainID string
channelID string
domainsErr error
channelsErr error
err error
}{
{
desc: "valid domainID and channelID",
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "valid domain route and channel ID",
domain: validRoute,
channel: channelID,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "valid domain ID and channel route",
domain: domainID,
channel: validRoute,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "valid domain route and channel route",
domain: validRoute,
channel: validRoute,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "invalid domain route and valid channel",
domain: invalidRoute,
channel: channelID,
domainID: "",
channelID: "",
domainsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveDomain,
},
{
desc: "valid domain and invalid channel",
domain: domainID,
channel: invalidRoute,
domainID: domainID,
channelID: "",
channelsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveChannel,
},
{
desc: "empty domain",
domain: "",
channel: channelID,
domainID: "",
channelID: "",
err: messaging.ErrEmptyRouteID,
},
{
desc: "empty channel",
domain: domainID,
channel: "",
domainID: domainID,
channelID: "",
err: messaging.ErrEmptyRouteID,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
domainsCall := domains.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.domain}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.domainID,
},
}, tc.domainsErr)
channelsCall := channels.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.channel, DomainId: tc.domainID}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.channelID,
},
}, tc.channelsErr)
domainID, channelID, err := resolver.Resolve(context.Background(), tc.domain, tc.channel)
assert.True(t, errors.Contains(err, tc.err), "expected error %v, got %v", tc.err, err)
if err == nil {
assert.Equal(t, tc.domainID, domainID, "expected domain ID %s, got %s", tc.domainID, domainID)
assert.Equal(t, tc.channelID, channelID, "expected channel ID %s, got %s", tc.channelID, channelID)
}
domainsCall.Unset()
channelsCall.Unset()
})
}
}
func TestResolveTopic(t *testing.T) {
resolver, domains, channels := setupResolver()
cases := []struct {
desc string
topic string
domain string
channel string
domainID string
channelID string
domainsErr error
channelsErr error
response string
err error
}{
{
desc: "valid topic with domainID and channelID",
topic: fmt.Sprintf(topicFmt, domainID, channelID),
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "valid topic with domain route and channel ID",
topic: fmt.Sprintf(topicFmt, validRoute, channelID),
domain: validRoute,
channel: channelID,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "valid topic with domain ID and channel route",
topic: fmt.Sprintf(topicFmt, domainID, validRoute),
domain: domainID,
channel: validRoute,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "valid topic with domain route and channel route",
topic: fmt.Sprintf(topicFmt, validRoute, validRoute),
domain: validRoute,
channel: validRoute,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "invalid topic with invalid domain route and valid channel",
topic: fmt.Sprintf(topicFmt, invalidRoute, channelID),
domain: invalidRoute,
channel: channelID,
domainID: "",
channelID: "",
domainsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveDomain,
},
{
desc: "valid topic with valid topic with domainID and channelID and subtopic",
topic: fmt.Sprintf(topicFmt, domainID, channelID) + "/subtopic",
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID) + "/subtopic",
err: nil,
},
{
desc: "invalid topic with empty domain",
topic: fmt.Sprintf(topicFmt, "", channelID),
domain: "",
channel: channelID,
domainID: "",
channelID: "",
err: messaging.ErrMalformedTopic,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
domainsCall := domains.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.domain}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.domainID,
},
}, tc.domainsErr)
channelsCall := channels.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.channel, DomainId: tc.domainID}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.channelID,
},
}, tc.channelsErr)
rtopic, err := resolver.ResolveTopic(context.Background(), tc.topic)
assert.True(t, errors.Contains(err, tc.err), "expected error %v, got %v", tc.err, err)
if err == nil {
assert.Equal(t, tc.response, rtopic, "expected topic %s, got %s", tc.response, rtopic)
}
domainsCall.Unset()
channelsCall.Unset()
})
}
}
+211 -2
View File
@@ -4,11 +4,17 @@
package messaging
import (
"context"
"fmt"
"net/url"
"strings"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
"github.com/absmach/supermq/pkg/errors"
"github.com/dgraph-io/ristretto/v2"
"github.com/gofrs/uuid/v5"
)
const (
@@ -17,15 +23,218 @@ const (
)
var (
ErrMalformedTopic = errors.New("malformed topic")
ErrMalformedSubtopic = errors.New("malformed subtopic")
mqWildcards = "+#"
wildcards = "*>"
subtopicInvalidChars = " #+"
wildcardsReplacer = strings.NewReplacer("+", "*", "#", ">")
pathReplacer = strings.NewReplacer("/", ".")
DefaultCacheConfig = CacheConfig{
NumCounters: 2e5, // 200k
MaxCost: 1 << 20, // 1MB
BufferItems: 64,
}
ErrMalformedTopic = errors.New("malformed topic")
ErrMalformedSubtopic = errors.New("malformed subtopic")
ErrEmptyRouteID = errors.New("empty route or id")
ErrFailedResolveDomain = errors.New("failed to resolve domain route")
ErrFailedResolveChannel = errors.New("failed to resolve channel route")
ErrCreateCache = errors.New("failed to create cache")
)
type CacheConfig struct {
NumCounters int64 `env:"NUM_COUNTERS" envDefault:"200000"` // number of keys to track frequency of.
MaxCost int64 `env:"MAX_COST" envDefault:"1048576"` // maximum cost of cache.
BufferItems int64 `env:"BUFFER_ITEMS" envDefault:"64"` // number of keys per Get buffer.
}
type parsedTopic struct {
domainID string
channelID string
subtopic string
err error
}
// TopicParser defines methods for parsing publish and subscribe topics.
// It uses a cache to store parsed topics for quick retrieval.
// It also resolves domain and channel IDs if requested.
type TopicParser interface {
ParsePublishTopic(ctx context.Context, topic string, resolve bool) (domainID, channelID, subtopic string, err error)
ParseSubscribeTopic(ctx context.Context, topic string, resolve bool) (domainID, channelID, subtopic string, err error)
}
type parser struct {
resolver TopicResolver
cache *ristretto.Cache[string, *parsedTopic]
}
// NewTopicParser creates a new instance of TopicParser.
func NewTopicParser(cfg CacheConfig, channels grpcChannelsV1.ChannelsServiceClient, domains grpcDomainsV1.DomainsServiceClient) (TopicParser, error) {
cache, err := ristretto.NewCache(&ristretto.Config[string, *parsedTopic]{
NumCounters: cfg.NumCounters,
MaxCost: cfg.MaxCost,
BufferItems: cfg.BufferItems,
Cost: costFunc,
})
if err != nil {
return nil, errors.Wrap(ErrCreateCache, err)
}
return &parser{
cache: cache,
resolver: NewTopicResolver(channels, domains),
}, nil
}
func (p *parser) ParsePublishTopic(ctx context.Context, topic string, resolve bool) (string, string, string, error) {
val, ok := p.cache.Get(topic)
if ok {
return val.domainID, val.channelID, val.subtopic, val.err
}
domainID, channelID, subtopic, err := ParsePublishTopic(topic)
if err != nil {
p.saveToCache(topic, "", "", "", err)
return "", "", "", err
}
var isRoute bool
if resolve {
domainID, channelID, isRoute, err = p.resolver.Resolve(ctx, domainID, channelID)
if err != nil {
return "", "", "", err
}
}
if !isRoute {
p.saveToCache(topic, domainID, channelID, subtopic, nil)
}
return domainID, channelID, subtopic, nil
}
func (p *parser) ParseSubscribeTopic(ctx context.Context, topic string, resolve bool) (string, string, string, error) {
domainID, channelID, subtopic, err := ParseSubscribeTopic(topic)
if err != nil {
return "", "", "", err
}
if resolve {
domainID, channelID, _, err = p.resolver.Resolve(ctx, domainID, channelID)
if err != nil {
return "", "", "", err
}
}
return domainID, channelID, subtopic, nil
}
func (p *parser) saveToCache(topic string, domainID, channelID, subtopic string, err error) {
p.cache.Set(topic, &parsedTopic{
domainID: domainID,
channelID: channelID,
subtopic: subtopic,
err: err,
}, 0)
}
func costFunc(val *parsedTopic) int64 {
errLen := 0
if val.err != nil {
errLen = len(val.err.Error())
}
cost := int64(len(val.domainID) + len(val.channelID) + len(val.subtopic) + errLen)
return cost
}
// TopicResolver contains definitions for resolving domain and channel IDs
// from their respective routes from the message topic.
type TopicResolver interface {
Resolve(ctx context.Context, domain, channel string) (domainID string, channelID string, isRoute bool, err error)
ResolveTopic(ctx context.Context, topic string) (rtopic string, err error)
}
type resolver struct {
channels grpcChannelsV1.ChannelsServiceClient
domains grpcDomainsV1.DomainsServiceClient
}
// NewTopicResolver creates a new instance of TopicResolver.
func NewTopicResolver(channelsClient grpcChannelsV1.ChannelsServiceClient, domainsClient grpcDomainsV1.DomainsServiceClient) TopicResolver {
return &resolver{
channels: channelsClient,
domains: domainsClient,
}
}
func (r *resolver) Resolve(ctx context.Context, domain, channel string) (string, string, bool, error) {
if domain == "" || channel == "" {
return "", "", false, ErrEmptyRouteID
}
domainID, isdomainRoute, err := r.resolveDomain(ctx, domain)
if err != nil {
return "", "", false, errors.Wrap(ErrFailedResolveDomain, err)
}
channelID, isChannelRoute, err := r.resolveChannel(ctx, channel, domainID)
if err != nil {
return "", "", false, errors.Wrap(ErrFailedResolveChannel, err)
}
isRoute := isdomainRoute || isChannelRoute
return domainID, channelID, isRoute, nil
}
func (r *resolver) ResolveTopic(ctx context.Context, topic string) (string, error) {
domain, channel, subtopic, err := ParseTopic(topic)
if err != nil {
return "", errors.Wrap(ErrMalformedTopic, err)
}
domainID, channelID, _, err := r.Resolve(ctx, domain, channel)
if err != nil {
return "", err
}
rtopic := EncodeAdapterTopic(domainID, channelID, subtopic)
return rtopic, nil
}
func (r *resolver) resolveDomain(ctx context.Context, domain string) (string, bool, error) {
if validateUUID(domain) == nil {
return domain, false, nil
}
d, err := r.domains.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: domain,
})
if err != nil {
return "", false, err
}
return d.Entity.Id, true, nil
}
func (r *resolver) resolveChannel(ctx context.Context, channel, domainID string) (string, bool, error) {
if validateUUID(channel) == nil {
return channel, false, nil
}
c, err := r.channels.RetrieveByRoute(ctx, &grpcCommonV1.RetrieveByRouteReq{
Route: channel,
DomainId: domainID,
})
if err != nil {
return "", false, err
}
return c.Entity.Id, true, nil
}
func validateUUID(extID string) (err error) {
id, err := uuid.FromString(extID)
if id.String() != extID || err != nil {
return err
}
return nil
}
func ParsePublishTopic(topic string) (domainID, chanID, subtopic string, err error) {
domainID, chanID, subtopic, err = ParseTopic(topic)
if err != nil {
+485
View File
@@ -4,13 +4,52 @@
package messaging_test
import (
"context"
"fmt"
"testing"
"time"
grpcCommonV1 "github.com/absmach/supermq/api/grpc/common/v1"
chmocks "github.com/absmach/supermq/channels/mocks"
dmocks "github.com/absmach/supermq/domains/mocks"
"github.com/absmach/supermq/internal/testsutil"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
validRoute = "valid-route"
invalidRoute = "invalid-route"
channelID = testsutil.GenerateUUID(&testing.T{})
domainID = testsutil.GenerateUUID(&testing.T{})
topicFmt = "m/%s/c/%s"
subtopic = "subtopic"
topicSubtopicFmt = "m/%s/c/%s/%s"
cachedTopic = fmt.Sprintf(topicSubtopicFmt, domainID, channelID, subtopic)
)
func setupResolver() (messaging.TopicResolver, *dmocks.DomainsServiceClient, *chmocks.ChannelsServiceClient) {
channels := new(chmocks.ChannelsServiceClient)
domains := new(dmocks.DomainsServiceClient)
resolver := messaging.NewTopicResolver(channels, domains)
return resolver, domains, channels
}
func setupParser() (messaging.TopicParser, *dmocks.DomainsServiceClient, *chmocks.ChannelsServiceClient, error) {
channels := new(chmocks.ChannelsServiceClient)
domains := new(dmocks.DomainsServiceClient)
parser, err := messaging.NewTopicParser(messaging.DefaultCacheConfig, channels, domains)
if err != nil {
return nil, nil, nil, err
}
return parser, domains, channels, nil
}
var ParsePublisherTopicTestCases = []struct {
desc string
topic string
@@ -457,3 +496,449 @@ func TestMessage_EncodeToMQTTTopic(t *testing.T) {
})
}
}
func TestResolve(t *testing.T) {
resolver, domains, channels := setupResolver()
cases := []struct {
desc string
domain string
channel string
domainID string
channelID string
isRoute bool
domainsErr error
channelsErr error
err error
}{
{
desc: "valid domainID and channelID",
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
isRoute: false,
err: nil,
},
{
desc: "valid domain route and channel ID",
domain: validRoute,
channel: channelID,
domainID: domainID,
channelID: channelID,
isRoute: true,
err: nil,
},
{
desc: "valid domain ID and channel route",
domain: domainID,
channel: validRoute,
domainID: domainID,
channelID: channelID,
isRoute: true,
err: nil,
},
{
desc: "valid domain route and channel route",
domain: validRoute,
channel: validRoute,
domainID: domainID,
channelID: channelID,
isRoute: true,
err: nil,
},
{
desc: "invalid domain route and valid channel",
domain: invalidRoute,
channel: channelID,
domainID: "",
channelID: "",
domainsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveDomain,
},
{
desc: "valid domain and invalid channel",
domain: domainID,
channel: invalidRoute,
domainID: domainID,
channelID: "",
channelsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveChannel,
},
{
desc: "empty domain",
domain: "",
channel: channelID,
domainID: "",
channelID: "",
err: messaging.ErrEmptyRouteID,
},
{
desc: "empty channel",
domain: domainID,
channel: "",
domainID: domainID,
channelID: "",
err: messaging.ErrEmptyRouteID,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
domainsCall := domains.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.domain}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.domainID,
},
}, tc.domainsErr)
channelsCall := channels.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.channel, DomainId: tc.domainID}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.channelID,
},
}, tc.channelsErr)
domainID, channelID, isRoute, err := resolver.Resolve(context.Background(), tc.domain, tc.channel)
assert.True(t, errors.Contains(err, tc.err), "expected error %v, got %v", tc.err, err)
if err == nil {
assert.Equal(t, tc.domainID, domainID, "expected domain ID %s, got %s", tc.domainID, domainID)
assert.Equal(t, tc.channelID, channelID, "expected channel ID %s, got %s", tc.channelID, channelID)
assert.Equal(t, tc.isRoute, isRoute, "expected isRoute %t, got %t", tc.isRoute, isRoute)
}
domainsCall.Unset()
channelsCall.Unset()
})
}
}
func TestResolveTopic(t *testing.T) {
resolver, domains, channels := setupResolver()
cases := []struct {
desc string
topic string
domain string
channel string
domainID string
channelID string
domainsErr error
channelsErr error
response string
err error
}{
{
desc: "valid topic with domainID and channelID",
topic: fmt.Sprintf(topicFmt, domainID, channelID),
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "valid topic with domain route and channel ID",
topic: fmt.Sprintf(topicFmt, validRoute, channelID),
domain: validRoute,
channel: channelID,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "valid topic with domain ID and channel route",
topic: fmt.Sprintf(topicFmt, domainID, validRoute),
domain: domainID,
channel: validRoute,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "valid topic with domain route and channel route",
topic: fmt.Sprintf(topicFmt, validRoute, validRoute),
domain: validRoute,
channel: validRoute,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID),
err: nil,
},
{
desc: "invalid topic with invalid domain route and valid channel",
topic: fmt.Sprintf(topicFmt, invalidRoute, channelID),
domain: invalidRoute,
channel: channelID,
domainID: "",
channelID: "",
domainsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveDomain,
},
{
desc: "valid topic with valid topic with domainID and channelID and subtopic",
topic: fmt.Sprintf(topicFmt, domainID, channelID) + "/subtopic",
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
response: fmt.Sprintf(topicFmt, domainID, channelID) + "/subtopic",
err: nil,
},
{
desc: "invalid topic with empty domain",
topic: fmt.Sprintf(topicFmt, "", channelID),
domain: "",
channel: channelID,
domainID: "",
channelID: "",
err: messaging.ErrMalformedTopic,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
domainsCall := domains.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.domain}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.domainID,
},
}, tc.domainsErr)
channelsCall := channels.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.channel, DomainId: tc.domainID}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.channelID,
},
}, tc.channelsErr)
rtopic, err := resolver.ResolveTopic(context.Background(), tc.topic)
assert.True(t, errors.Contains(err, tc.err), "expected error %v, got %v", tc.err, err)
if err == nil {
assert.Equal(t, tc.response, rtopic, "expected topic %s, got %s", tc.response, rtopic)
}
domainsCall.Unset()
channelsCall.Unset()
})
}
}
func TestParserPublishTopic(t *testing.T) {
parser, domains, channels, err := setupParser()
assert.Nil(t, err, fmt.Sprintf("unexpected error while setting up parser: %v", err))
udomainID := testsutil.GenerateUUID(t)
uchannelID := testsutil.GenerateUUID(t)
cachedInvalidTopic := "m/invalid-domain/c"
dom, ch, st, err := parser.ParsePublishTopic(context.Background(), cachedTopic, false)
assert.Nil(t, err, fmt.Sprintf("unexpected error while publishing topic: %v", err))
assert.Equal(t, domainID, dom, "expected domainID %s, got %s", domainID, dom)
assert.Equal(t, channelID, ch, "expected channelID %s, got %s", channelID, ch)
assert.Equal(t, subtopic, st, "expected subtopic %s, got %s", subtopic, st)
dom, ch, st, err = parser.ParsePublishTopic(context.Background(), cachedInvalidTopic, false)
assert.NotNil(t, err, "expected error for invalid cached topic")
assert.Equal(t, "", dom, "expected empty domainID for invalid topic")
assert.Equal(t, "", ch, "expected empty channelID for invalid topic")
assert.Equal(t, "", st, "expected empty subtopic for invalid topic")
time.Sleep(10 * time.Millisecond) // Ensure cache is populated
cases := []struct {
desc string
topic string
resolve bool
domain string
channel string
domainID string
channelID string
domainsErr error
channelsErr error
err error
}{
{
desc: "valid uncached topic with domainID and channelID",
topic: fmt.Sprintf(topicFmt, udomainID, uchannelID) + "/subtopic",
resolve: true,
domain: udomainID,
channel: uchannelID,
domainID: udomainID,
channelID: uchannelID,
err: nil,
},
{
desc: "valid cached topic with domainID and channelID",
topic: cachedTopic,
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "invalid uncached topic with invalid format",
topic: "invalid-topic",
domain: "",
channel: "",
domainID: "",
channelID: "",
err: messaging.ErrMalformedTopic,
},
{
desc: "invalid cached topic with invalid format",
topic: cachedInvalidTopic,
domain: "",
channel: "",
domainID: "",
channelID: "",
err: messaging.ErrMalformedTopic,
},
{
desc: "valid uncached topic with domain and channel routes",
topic: fmt.Sprintf(topicFmt, validRoute, validRoute) + "/subtopic",
resolve: true,
domain: validRoute,
channel: validRoute,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "valid uncached topic with failed domain resolution",
topic: fmt.Sprintf(topicFmt, invalidRoute, uchannelID) + "/subtopic",
resolve: true,
domain: invalidRoute,
channel: uchannelID,
domainID: "",
channelID: "",
domainsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveDomain,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
domainsCall := domains.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.domain}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.domainID,
},
}, tc.domainsErr)
channelsCall := channels.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.channel, DomainId: tc.domainID}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.channelID,
},
}, tc.channelsErr)
domainID, channelID, subtopic, err := parser.ParsePublishTopic(context.Background(), tc.topic, tc.resolve)
assert.True(t, errors.Contains(err, tc.err), "expected error %v, got %v", tc.err, err)
if err == nil {
assert.Equal(t, tc.domainID, domainID, "expected domainID %s, got %s", tc.domainID, domainID)
assert.Equal(t, tc.channelID, channelID, "expected channelID %s, got %s", tc.channelID, channelID)
assert.Equal(t, subtopic, "subtopic", "expected subtopic %s, got %s", "subtopic", subtopic)
}
domainsCall.Unset()
channelsCall.Unset()
})
}
}
func BenchmarkParserPublishTopic(b *testing.B) {
parser, _, _, err := setupParser()
if err != nil {
b.Fatalf("unexpected error while setting up parser: %v", err)
}
for _, tc := range ParsePublisherTopicTestCases {
b.Run(tc.desc, func(b *testing.B) {
for b.Loop() {
_, _, _, _ = parser.ParsePublishTopic(context.Background(), tc.topic, false)
}
})
}
}
func TestParserSubscribeTopic(t *testing.T) {
parser, domains, channels, err := setupParser()
assert.Nil(t, err, fmt.Sprintf("unexpected error while setting up parser: %v", err))
cases := []struct {
desc string
topic string
resolve bool
domain string
channel string
domainID string
channelID string
subtopic string
domainsErr error
channelsErr error
err error
}{
{
desc: "valid topic with domainID and channelID",
topic: fmt.Sprintf(topicFmt, domainID, channelID),
resolve: true,
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "valid topic with domainID and channelID and subtopic",
topic: fmt.Sprintf(topicSubtopicFmt, domainID, channelID, subtopic),
resolve: true,
domain: domainID,
channel: channelID,
domainID: domainID,
channelID: channelID,
subtopic: subtopic,
err: nil,
},
{
desc: "valid topic with domain and channel routes",
topic: fmt.Sprintf(topicFmt, validRoute, validRoute),
resolve: true,
domain: validRoute,
channel: validRoute,
domainID: domainID,
channelID: channelID,
err: nil,
},
{
desc: "invalid topic with invalid format",
topic: "invalid-topic",
resolve: false,
domain: "",
channel: "",
domainID: "",
channelID: "",
err: messaging.ErrMalformedTopic,
},
{
desc: "valid topic with invalid domain route",
topic: fmt.Sprintf(topicFmt, invalidRoute, validRoute),
resolve: true,
domain: invalidRoute,
channel: validRoute,
domainID: "",
channelID: "",
domainsErr: svcerr.ErrNotFound,
err: messaging.ErrFailedResolveDomain,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
domainsCall := domains.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.domain}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.domainID,
},
}, tc.domainsErr)
channelsCall := channels.On("RetrieveByRoute", mock.Anything, &grpcCommonV1.RetrieveByRouteReq{Route: tc.channel, DomainId: tc.domainID}).Return(&grpcCommonV1.RetrieveEntityRes{
Entity: &grpcCommonV1.EntityBasic{
Id: tc.channelID,
},
}, tc.channelsErr)
dom, ch, st, err := parser.ParseSubscribeTopic(context.Background(), tc.topic, tc.resolve)
assert.True(t, errors.Contains(err, tc.err), "expected error %v, got %v", tc.err, err)
if err == nil {
assert.Equal(t, tc.domainID, dom, "expected domainID %s, got %s", tc.domainID, dom)
assert.Equal(t, tc.channelID, ch, "expected channelID %s, got %s", tc.channelID, ch)
assert.Equal(t, tc.subtopic, st, "expected subtopic %s, got %s", tc.subtopic, st)
}
domainsCall.Unset()
channelsCall.Unset()
})
}
}
+1 -1
View File
@@ -23,7 +23,7 @@ func TestHealth(t *testing.T) {
certsTs, _, _ := setupCerts()
defer certsTs.Close()
httpAdapterTs, _ := setupMessages()
httpAdapterTs, _ := setupMessages(t)
defer httpAdapterTs.Close()
sdkConf := sdk.Config{
+7 -5
View File
@@ -41,14 +41,16 @@ var (
domainsGRPCClient *dmocks.DomainsServiceClient
)
func setupMessages() (*httptest.Server, *pubsub.PubSub) {
func setupMessages(t *testing.T) (*httptest.Server, *pubsub.PubSub) {
clientsGRPCClient = new(climocks.ClientsServiceClient)
channelsGRPCClient = new(chmocks.ChannelsServiceClient)
domainsGRPCClient = new(dmocks.DomainsServiceClient)
pub := new(pubsub.PubSub)
authn := new(authnmocks.Authentication)
resolver := messaging.NewTopicResolver(channelsGRPCClient, domainsGRPCClient)
handler := adapter.NewHandler(pub, authn, clientsGRPCClient, channelsGRPCClient, resolver, smqlog.NewMock())
parser, err := messaging.NewTopicParser(messaging.DefaultCacheConfig, channelsGRPCClient, domainsGRPCClient)
assert.Nil(t, err, fmt.Sprintf("unexpected error while setting up parser: %v", err))
handler := adapter.NewHandler(pub, authn, clientsGRPCClient, channelsGRPCClient, parser, smqlog.NewMock())
mux := api.MakeHandler(smqlog.NewMock(), "")
target := httptest.NewServer(mux)
@@ -74,7 +76,7 @@ func setupMessages() (*httptest.Server, *pubsub.PubSub) {
}
func TestSendMessage(t *testing.T) {
ts, pub := setupMessages()
ts, pub := setupMessages(t)
defer ts.Close()
msg := `[{"n":"current","t":-1,"v":1.6}]`
@@ -203,7 +205,7 @@ func TestSendMessage(t *testing.T) {
}
func TestSetContentType(t *testing.T) {
ts, _ := setupMessages()
ts, _ := setupMessages(t)
defer ts.Close()
sdkConf := sdk.Config{
+23 -17
View File
@@ -6,23 +6,26 @@ WebSocket adapter provides a [WebSocket](https://en.wikipedia.org/wiki/WebSocket
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| ------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter (debug, info, warn, error) | info |
| SMQ_WS_ADAPTER_HTTP_HOST | Service WS host | "" |
| SMQ_WS_ADAPTER_HTTP_PORT | Service WS port | 8190 |
| SMQ_WS_ADAPTER_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_WS_ADAPTER_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_WS_ADAPTER_INSTANCE_ID | Service instance ID | "" |
| Variable | Description | Default |
| --------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter (debug, info, warn, error) | info |
| SMQ_WS_ADAPTER_HTTP_HOST | Service WS host | "" |
| SMQ_WS_ADAPTER_HTTP_PORT | Service WS port | 8190 |
| SMQ_WS_ADAPTER_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_WS_ADAPTER_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_WS_ADAPTER_CACHE_NUM_COUNTERS | Number of cache counters to keep that hold access frequency information | 200000 |
| SMQ_WS_ADAPTER_CACHE_MAX_COST | Maximum size of the cache(in bytes) | 1048576 |
| SMQ_WS_ADAPTER_CACHE_BUFFER_ITEMS | Number of cache `Get` buffers | 64 |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_WS_ADAPTER_INSTANCE_ID | Service instance ID | "" |
## Deployment
@@ -49,6 +52,9 @@ SMQ_WS_ADAPTER_HTTP_HOST=localhost \
SMQ_WS_ADAPTER_HTTP_PORT=8190 \
SMQ_WS_ADAPTER_HTTP_SERVER_CERT="" \
SMQ_WS_ADAPTER_HTTP_SERVER_KEY="" \
SMQ_WS_ADAPTER_CACHE_NUM_COUNTERS=200000 \
SMQ_WS_ADAPTER_CACHE_MAX_COST=1048576 \
SMQ_WS_ADAPTER_CACHE_BUFFER_ITEMS=64 \
SMQ_CLIENTS_GRPC_URL=localhost:7000 \
SMQ_CLIENTS_GRPC_TIMEOUT=1s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
+3 -1
View File
@@ -117,10 +117,12 @@ func TestHandshake(t *testing.T) {
authn := new(authnMocks.Authentication)
domains := new(dmocks.DomainsServiceClient)
resolver := messaging.NewTopicResolver(channels, domains)
parser, err := messaging.NewTopicParser(messaging.DefaultCacheConfig, channels, domains)
require.Nil(t, err, fmt.Sprintf("unexpected error while setting up parser: %v", err))
svc, pubsub := newService(clients, channels)
target := newHTTPServer(svc, resolver)
defer target.Close()
handler := ws.NewHandler(pubsub, smqlog.NewMock(), authn, clients, channels, resolver)
handler := ws.NewHandler(pubsub, smqlog.NewMock(), authn, clients, channels, parser)
ts, err := newProxyHTPPServer(handler, target)
require.Nil(t, err)
defer ts.Close()
+1 -1
View File
@@ -79,7 +79,7 @@ func decodeRequest(r *http.Request, resolver messaging.TopicResolver, logger *sl
domain := chi.URLParam(r, "domain")
channel := chi.URLParam(r, "channel")
domainID, channelID, err := resolver.Resolve(r.Context(), domain, channel)
domainID, channelID, _, err := resolver.Resolve(r.Context(), domain, channel)
if err != nil {
return connReq{}, err
}
+7 -19
View File
@@ -52,18 +52,18 @@ type handler struct {
channels grpcChannelsV1.ChannelsServiceClient
authn smqauthn.Authentication
logger *slog.Logger
resolver messaging.TopicResolver
parser messaging.TopicParser
}
// NewHandler creates new Handler entity.
func NewHandler(pubsub messaging.PubSub, logger *slog.Logger, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, resolver messaging.TopicResolver) session.Handler {
func NewHandler(pubsub messaging.PubSub, logger *slog.Logger, authn smqauthn.Authentication, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, parser messaging.TopicParser) session.Handler {
return &handler{
logger: logger,
pubsub: pubsub,
authn: authn,
clients: clients,
channels: channels,
resolver: resolver,
parser: parser,
}
}
@@ -92,11 +92,7 @@ func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byt
token = string(s.Password)
}
domain, channel, _, err := messaging.ParsePublishTopic(*topic)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedPublish, err))
}
domainID, channelID, err := h.resolver.Resolve(ctx, domain, channel)
domainID, channelID, _, err := h.parser.ParsePublishTopic(ctx, *topic, true)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedPublish, err))
}
@@ -125,15 +121,11 @@ func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
}
for _, topic := range *topics {
domain, channel, _, err := messaging.ParseSubscribeTopic(topic)
domainID, channelID, _, err := h.parser.ParseSubscribeTopic(ctx, topic, true)
if err != nil {
return err
}
domainID, chanID, err := h.resolver.Resolve(ctx, domain, channel)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedPublish, err))
}
if _, _, err := h.authAccess(ctx, string(s.Password), domainID, chanID, connections.Subscribe); err != nil {
if _, _, err := h.authAccess(ctx, string(s.Password), domainID, channelID, connections.Subscribe); err != nil {
return err
}
}
@@ -157,14 +149,10 @@ func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) e
return nil
}
domain, channel, subtopic, err := messaging.ParsePublishTopic(*topic)
domainID, channelID, subtopic, err := h.parser.ParsePublishTopic(ctx, *topic, true)
if err != nil {
return errors.Wrap(errFailedPublish, err)
}
domainID, channelID, err := h.resolver.Resolve(ctx, domain, channel)
if err != nil {
return mgate.NewHTTPProxyError(http.StatusBadRequest, errors.Wrap(errFailedPublish, err))
}
msg := messaging.Message{
Protocol: protocol,