NOISSUE - Use RabbitMQ as MQTT broker (#2695)

Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
This commit is contained in:
b1ackd0t
2025-02-26 11:45:59 +03:00
committed by GitHub
parent df0f9068d6
commit 56829c15d8
22 changed files with 1336 additions and 144 deletions
+1 -1
View File
@@ -23,7 +23,7 @@ jobs:
CHECK=""
for file in $(grep -rl --exclude-dir={.git,build} \
--exclude=\*.{crt,key,pem,zed,hcl,md,json,csv,mod,sum,tmpl,args} \
--exclude={CODEOWNERS,LICENSE,MAINTAINERS} \
--exclude={CODEOWNERS,LICENSE,MAINTAINERS,enabled_plugins,rabbitmq.conf} \
.); do
if ! head -n 5 "$file" | grep -q "Copyright (c) Abstract Machines"; then
+83 -16
View File
@@ -26,6 +26,7 @@ import (
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/mqtt"
"github.com/absmach/supermq/mqtt/events"
mqtttracing "github.com/absmach/supermq/mqtt/tracing"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
@@ -33,10 +34,12 @@ import (
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
msgevents "github.com/absmach/supermq/pkg/messaging/events"
"github.com/absmach/supermq/pkg/messaging/handler"
mqttpub "github.com/absmach/supermq/pkg/messaging/mqtt"
"github.com/absmach/supermq/pkg/server"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/cenkalti/backoff/v4"
"github.com/eclipse/paho.mqtt.golang/packets"
"golang.org/x/sync/errgroup"
)
@@ -48,21 +51,26 @@ const (
)
type config struct {
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
LogLevel string `env:"SMQ_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"SMQ_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTTargetUsername string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME" envDefault:""`
MQTTTargetPassword string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD" envDefault:""`
MQTTForwarderTimeout time.Duration `env:"SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"`
MQTTTargetHealthCheck string `env:"SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
MQTTQoS uint8 `env:"SMQ_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"`
HTTPPort string `env:"SMQ_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
HTTPTargetPath string `env:"SMQ_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
Instance string `env:"SMQ_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
@@ -121,6 +129,38 @@ func main() {
}()
tracer := tp.Tracer(svcName)
bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTTargetUsername, cfg.MQTTTargetPassword, cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
exitCode = 1
return
}
defer mpub.Close()
mpub, err = msgevents.NewPublisherMiddleware(ctx, mpub, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to create event store middleware: %s", err))
exitCode = 1
return
}
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
exitCode = 1
return
}
np, err := brokers.NewPublisher(ctx, cfg.BrokerURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
@@ -185,7 +225,10 @@ func main() {
go chc.CallHome(ctx)
}
var interceptor session.Interceptor
interceptor := interceptor{
username: cfg.MQTTTargetUsername,
password: cfg.MQTTTargetPassword,
}
logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort))
g.Go(func() error {
return proxyMQTT(ctx, cfg, logger, h, interceptor)
@@ -281,3 +324,27 @@ func stopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger *s
return nil
}
}
type interceptor struct {
username string
password string
}
// This interceptor adds the correct credentials to upstream MQTT broker since the downstream clients
// are authenticated to the MQTT adapter but not upstream MQTT broker.
func (ic interceptor) Intercept(ctx context.Context, pkt packets.ControlPacket, dir session.Direction) (packets.ControlPacket, error) {
if connectPkt, ok := pkt.(*packets.ConnectPacket); ok {
if ic.username != "" {
connectPkt.Username = ic.username
connectPkt.UsernameFlag = true
}
if ic.password != "" {
connectPkt.Password = []byte(ic.password)
connectPkt.PasswordFlag = true
}
return connectPkt, nil
}
return pkt, nil
}
+22 -22
View File
@@ -6,27 +6,27 @@ SuperMQ CoAP adapter provides an [CoAP](http://coap.technology/) API for sending
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| --------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- |
| SMQ_COAP_ADAPTER_LOG_LEVEL | Log level for the CoAP Adapter (debug, info, warn, error) | info |
| SMQ_COAP_ADAPTER_HOST | CoAP service listening host | "" |
| SMQ_COAP_ADAPTER_PORT | CoAP service listening port | 5683 |
| SMQ_COAP_ADAPTER_SERVER_CERT | CoAP service server certificate | "" |
| SMQ_COAP_ADAPTER_SERVER_KEY | CoAP service server key | "" |
| SMQ_COAP_ADAPTER_HTTP_HOST | Service HTTP listening host | "" |
| SMQ_COAP_ADAPTER_HTTP_PORT | Service listening port | 5683 |
| SMQ_COAP_ADAPTER_HTTP_SERVER_CERT | Service server certificate | "" |
| SMQ_COAP_ADAPTER_HTTP_SERVER_KEY | Service server key | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <nats://localhost:4222> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to magistrala call home server | true |
| SMQ_COAP_ADAPTER_INSTANCE_ID | CoAP adapter instance ID | "" |
| Variable | Description | Default |
| --------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_COAP_ADAPTER_LOG_LEVEL | Log level for the CoAP Adapter (debug, info, warn, error) | info |
| SMQ_COAP_ADAPTER_HOST | CoAP service listening host | "" |
| SMQ_COAP_ADAPTER_PORT | CoAP service listening port | 5683 |
| SMQ_COAP_ADAPTER_SERVER_CERT | CoAP service server certificate | "" |
| SMQ_COAP_ADAPTER_SERVER_KEY | CoAP service server key | "" |
| SMQ_COAP_ADAPTER_HTTP_HOST | Service HTTP listening host | "" |
| SMQ_COAP_ADAPTER_HTTP_PORT | Service listening port | 5683 |
| SMQ_COAP_ADAPTER_HTTP_SERVER_CERT | Service server certificate | "" |
| SMQ_COAP_ADAPTER_HTTP_SERVER_KEY | Service server key | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to magistrala call home server | true |
| SMQ_COAP_ADAPTER_INSTANCE_ID | CoAP adapter instance ID | "" |
## Deployment
@@ -62,7 +62,7 @@ SMQ_CLIENTS_GRPC_TIMEOUT=1s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
SMQ_CLIENTS_GRPC_CLIENT_KEY="" \
SMQ_CLIENTS_GRPC_SERVER_CERTS="" \
SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \
SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \
SMQ_JAEGER_URL=http://localhost:14268/api/traces \
SMQ_JAEGER_TRACE_RATIO=1.0 \
SMQ_SEND_TELEMETRY=true \
+11 -3
View File
@@ -20,6 +20,7 @@ SMQ_NATS_URL=nats://nats:${SMQ_NATS_PORT}
# Configs for nats as MQTT broker
SMQ_NATS_HEALTH_CHECK=http://nats:${SMQ_NATS_HTTP_PORT}/healthz
SMQ_NATS_WS_TARGET_PATH=
SMQ_NATS_MQTT_QOS=0
## RabbitMQ
SMQ_RABBITMQ_PORT=5672
@@ -29,19 +30,25 @@ SMQ_RABBITMQ_PASS=supermq
SMQ_RABBITMQ_COOKIE=supermq
SMQ_RABBITMQ_VHOST=/
SMQ_RABBITMQ_URL=amqp://${SMQ_RABBITMQ_USER}:${SMQ_RABBITMQ_PASS}@rabbitmq:${SMQ_RABBITMQ_PORT}${SMQ_RABBITMQ_VHOST}
SMQ_RABBITMQ_MQTT_QOS=0
SMQ_RABBITMQ_WS_TARGET_PATH=/
## Message Broker
SMQ_MESSAGE_BROKER_TYPE=nats
SMQ_MESSAGE_BROKER_URL=${SMQ_NATS_URL}
## MQTT Broker
SMQ_MQTT_BROKER_TYPE=nats
SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
SMQ_MQTT_BROKER_TYPE=rabbitmq
SMQ_MQTT_BROKER_HEALTH_CHECK=
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_RABBITMQ_MQTT_QOS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME=${SMQ_RABBITMQ_USER}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD=${SMQ_RABBITMQ_PASS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_RABBITMQ_WS_TARGET_PATH}
## Redis
SMQ_REDIS_TCP_PORT=6379
@@ -60,7 +67,7 @@ SMQ_JAEGER_TRACE_RATIO=1.0
SMQ_JAEGER_MEMORY_MAX_TRACES=5000
## Call home
SMQ_SEND_TELEMETRY=false
SMQ_SEND_TELEMETRY=true
## Postgres
SMQ_POSTGRES_MAX_CONNECTIONS=100
@@ -327,6 +334,7 @@ SMQ_HTTP_ADAPTER_INSTANCE_ID=
### MQTT
SMQ_MQTT_ADAPTER_LOG_LEVEL=debug
SMQ_MQTT_ADAPTER_MQTT_PORT=1883
SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s
SMQ_MQTT_ADAPTER_WS_PORT=8080
SMQ_MQTT_ADAPTER_INSTANCE=
SMQ_MQTT_ADAPTER_INSTANCE_ID=
+27 -9
View File
@@ -26,21 +26,25 @@ To pull docker images from a specific release you need to change the value of `S
SuperMQ supports configurable MQTT broker and Message broker, which also acts as an events store. SuperMQ uses two types of brokers:
1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This is NATS.
1. MQTT_BROKER: Handles MQTT communication between MQTT adapters and message broker. This can either be 'RabbitMQ' or 'NATS'.
2. MESSAGE_BROKER: Manages message exchange between SuperMQ core, optional, and external services. This can either be 'NATS' or 'RabbitMQ'. This is used to store messages for distributed processing.
Events store: This is used by SuperMQ services to store events for distributed processing. SuperMQ uses a single service to be the message broker and events store. This can either be 'NATS' or 'RabbitMQ'. Redis can also be used as an events store, but it requires a message broker to be deployed along with it for message exchange.
This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker.
This is the same as MESSAGE_BROKER. This can either be 'NATS' or 'RabbitMQ' or 'Redis'. If Redis is used as an events store, then RabbitMQ or NATS is used as a message broker.
The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use NATS as a MQTT_BROKER, MESSAGE_BROKER and EVENTS_STORE.
The current deployment strategy for SuperMQ in `docker/docker-compose.yml` is to use RabbitMQ as a MQTT_BROKER and NATS as a MESSAGE_BROKER and EVENTS_STORE.
Therefore, the following combinations are possible:
- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ
- MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis
- MESSAGE_BROKER: NATS, EVENTS_STORE: NATS
- MESSAGE_BROKER: NATS, EVENTS_STORE: Redis
- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: NATS
- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: NATS, EVENTS_STORE: Redis
- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ
- MQTT_BROKER: RabbitMQ, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis
- MQTT_BROKER: NATS, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: RabbitMQ
- MQTT_BROKER: NATS, MESSAGE_BROKER: RabbitMQ, EVENTS_STORE: Redis
- MQTT_BROKER: NATS, MESSAGE_BROKER: NATS, EVENTS_STORE: NATS
- MQTT_BROKER: NATS, MESSAGE_BROKER: NATS, EVENTS_STORE: Redis
For Message brokers other than NATS, you would need to build the docker images with RabbitMQ as the build tag and change the `docker/.env`. For example, to use RabbitMQ as a message broker:
@@ -66,6 +70,20 @@ SMQ_ES_TYPE=redis
SMQ_ES_URL=${SMQ_REDIS_URL}
```
For MQTT broker other than RabbitMQ, you would need to change the `docker/.env`. For example, to use NATS as a MQTT broker:
```env
SMQ_MQTT_BROKER_TYPE=nats
SMQ_MQTT_BROKER_HEALTH_CHECK=${SMQ_NATS_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_QOS=${SMQ_NATS_MQTT_QOS}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK=${SMQ_MQTT_BROKER_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=${SMQ_MQTT_BROKER_TYPE}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=${SMQ_NATS_WS_TARGET_PATH}
```
### RabbitMQ configuration
```yaml
@@ -107,9 +125,9 @@ By using environment variables file at `docker/.env` you can modify the below gi
`SMQ_NGINX_SERVER_NAME` environmental variable is used to configure nginx directive `server_name`. If environmental variable `SMQ_NGINX_SERVER_NAME` is empty then default value `localhost` will set to `server_name`.
`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned.
`SMQ_NGINX_SERVER_CERT` environmental variable is used to configure nginx directive `ssl_certificate`. If environmental variable `SMQ_NGINX_SERVER_CERT` is empty then by default server certificate in the path `docker/ssl/certs/supermq-server.crt` will be assigned.
`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned.
`SMQ_NGINX_SERVER_KEY` environmental variable is used to configure nginx directive `ssl_certificate_key`. If environmental variable `SMQ_NGINX_SERVER_KEY` is empty then by default server certificate key in the path `docker/ssl/certs/supermq-server.key` will be assigned.
`SMQ_NGINX_SERVER_CLIENT_CA` environmental variable is used to configure nginx directive `ssl_client_certificate`. If environmental variable `SMQ_NGINX_SERVER_CLIENT_CA` is empty then by default certificate in the path `docker/ssl/certs/ca.crt` will be assigned.
+29 -2
View File
@@ -14,6 +14,7 @@ volumes:
supermq-channels-db-volume:
supermq-clients-redis-volume:
supermq-broker-volume:
supermq-mqtt-broker-volume:
supermq-spicedb-db-volume:
supermq-auth-db-volume:
supermq-pat-db-volume:
@@ -869,7 +870,7 @@ services:
create_host_path: true
jaeger:
image: jaegertracing/all-in-one:1.60
image: jaegertracing/all-in-one:1.66.0
container_name: supermq-jaeger
environment:
COLLECTOR_OTLP_ENABLED: ${SMQ_JAEGER_COLLECTOR_OTLP_ENABLED}
@@ -885,6 +886,7 @@ services:
container_name: supermq-mqtt
depends_on:
- clients
- rabbitmq
- nats
restart: on-failure
environment:
@@ -892,11 +894,16 @@ services:
SMQ_MQTT_ADAPTER_MQTT_PORT: ${SMQ_MQTT_ADAPTER_MQTT_PORT}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT}
SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_USERNAME}
SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_PASSWORD}
SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT: ${SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT}
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK: ${SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK}
SMQ_MQTT_ADAPTER_MQTT_QOS: ${SMQ_MQTT_ADAPTER_MQTT_QOS}
SMQ_MQTT_ADAPTER_WS_PORT: ${SMQ_MQTT_ADAPTER_WS_PORT}
SMQ_MQTT_ADAPTER_INSTANCE_ID: ${SMQ_MQTT_ADAPTER_INSTANCE_ID}
SMQ_MQTT_ADAPTER_WS_TARGET_HOST: ${SMQ_MQTT_ADAPTER_WS_TARGET_HOST}
SMQ_MQTT_ADAPTER_WS_TARGET_PORT: ${SMQ_MQTT_ADAPTER_WS_TARGET_PORT}
SMQ_MQTT_ADAPTER_WS_TARGET_PATH: ${SMQ_MQTT_ADAPTER_WS_TARGET_PATH}
SMQ_MQTT_ADAPTER_INSTANCE: ${SMQ_MQTT_ADAPTER_INSTANCE}
SMQ_ES_URL: ${SMQ_ES_URL}
SMQ_CLIENTS_GRPC_URL: ${SMQ_CLIENTS_GRPC_URL}
@@ -1202,8 +1209,28 @@ services:
bind:
create_host_path: true
rabbitmq:
image: rabbitmq:4.0.5-alpine
container_name: supermq-rabbitmq
restart: on-failure
environment:
RABBITMQ_ERLANG_COOKIE: ${SMQ_RABBITMQ_COOKIE}
RABBITMQ_DEFAULT_USER: ${SMQ_RABBITMQ_USER}
RABBITMQ_DEFAULT_PASS: ${SMQ_RABBITMQ_PASS}
RABBITMQ_DEFAULT_VHOST: ${SMQ_RABBITMQ_VHOST}
RABBITMQ_CONFIG_FILES: /etc/rabbitmq/conf.d/
ports:
- ${SMQ_RABBITMQ_PORT}:${SMQ_RABBITMQ_PORT}
- ${SMQ_RABBITMQ_HTTP_PORT}:${SMQ_RABBITMQ_HTTP_PORT}
volumes:
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
- ./rabbitmq/rabbitmq.conf:/etc/rabbitmq/conf.d/10-defaults.conf
- supermq-mqtt-broker-volume:/var/lib/rabbitmq
networks:
- supermq-base-net
nats:
image: nats:2.10.9-alpine
image: nats:2.10.25-alpine
container_name: supermq-nats
restart: on-failure
command: "--config=/etc/nats/nats.conf"
+1
View File
@@ -0,0 +1 @@
[rabbitmq_management,rabbitmq_mqtt].
+15
View File
@@ -0,0 +1,15 @@
## DEFAULT SETTINGS ARE NOT MEANT TO BE TAKEN STRAIGHT INTO PRODUCTION
## see https://www.rabbitmq.com/configure.html for further information
## on configuring RabbitMQ
## allow access to the guest user from anywhere on the network
## https://www.rabbitmq.com/access-control.html#loopback-users
## https://www.rabbitmq.com/production-checklist.html#users
loopback_users.guest = false
## Send all logs to stdout/TTY. Necessary to see logs when running via
## a container
log.console = true
## Enable anonymous connection
mqtt.allow_anonymous = true
+9 -10
View File
@@ -13,6 +13,7 @@ require (
github.com/authzed/spicedb v1.40.1
github.com/caarlos0/env/v11 v11.3.1
github.com/cenkalti/backoff/v4 v4.3.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fatih/color v1.18.0
github.com/go-chi/chi/v5 v5.2.1
github.com/go-kit/kit v0.13.0
@@ -51,7 +52,7 @@ require (
golang.org/x/crypto v0.35.0
golang.org/x/oauth2 v0.27.0
golang.org/x/sync v0.11.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287
google.golang.org/genproto/googleapis/rpc v0.0.0-20250207221924-e9438ea467c6
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.5
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
@@ -79,7 +80,6 @@ require (
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dsnet/golib/memfile v1.0.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.5.0 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
@@ -96,7 +96,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
@@ -110,7 +110,6 @@ require (
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v4 v4.18.3 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/jzelinskie/stringz v0.0.3 // indirect
@@ -126,7 +125,7 @@ require (
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nkeys v0.4.9 // indirect
github.com/nats-io/nkeys v0.4.10 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
@@ -145,7 +144,7 @@ require (
github.com/samber/lo v1.49.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/smarty/assertions v1.16.0 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
@@ -157,12 +156,12 @@ require (
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 // indirect
golang.org/x/time v0.10.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250207221924-e9438ea467c6 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
+17 -18
View File
@@ -156,8 +156,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0 h1:VD1gqscl4nYs1YxVuSdemTrSgTKrwOWDK0FVFMqm+Cg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.0/go.mod h1:4EgsQoS4TOhJizV+JTFg40qx1Ofh3XmXEQNBpgvNT40=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@@ -232,9 +232,8 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU=
github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA=
github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw=
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
@@ -309,8 +308,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk=
github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
@@ -383,8 +382,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY=
github.com/smarty/assertions v1.15.0/go.mod h1:yABtdzeQs6l1brC900WlRNwj6ZR55d7B+E8C6HtKdec=
github.com/smarty/assertions v1.16.0 h1:EvHNkdRA4QHMrn75NZSoUQ/mAUXAYWfatfB01yTCzfY=
github.com/smarty/assertions v1.16.0/go.mod h1:duaaFdCS0K9dnoM50iyek/eYINOZ64gbh1Xlf6LG7AI=
github.com/smartystreets/goconvey v1.8.1 h1:qGjIddxOk4grTu9JPOU31tVfq3cNdBlNa5sSznIX1xY=
github.com/smartystreets/goconvey v1.8.1/go.mod h1:+/u4qLyY6x1jReYOp7GOM2FSt8aP9CzCZL03bI28W60=
github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo=
@@ -484,8 +483,8 @@ golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZP
golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs=
golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac h1:l5+whBCLH3iH2ZNHYLbAe58bo7yrN4mVcnkHDYz5vvs=
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac/go.mod h1:hH+7mtFmImwwcMvScyxUhjuVHR3HGaDPMn9rMSUUbxo=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -510,8 +509,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
@@ -568,8 +567,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4=
golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@@ -598,10 +597,10 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287 h1:A2ni10G3UlplFrWdCDJTl7D7mJ7GSRm37S+PDimaKRw=
google.golang.org/genproto/googleapis/api v0.0.0-20250127172529-29210b9bc287/go.mod h1:iYONQfRdizDB8JJBybql13nArx91jcUk7zCXEsOofM4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk=
google.golang.org/genproto/googleapis/api v0.0.0-20250207221924-e9438ea467c6 h1:L9JNMl/plZH9wmzQUHleO/ZZDSN+9Gh41wPczNy+5Fk=
google.golang.org/genproto/googleapis/api v0.0.0-20250207221924-e9438ea467c6/go.mod h1:iYONQfRdizDB8JJBybql13nArx91jcUk7zCXEsOofM4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250207221924-e9438ea467c6 h1:2duwAxN2+k0xLNpjnHTXoMUgnv6VPSp5fiqTuwSxjmI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250207221924-e9438ea467c6/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+18 -18
View File
@@ -6,23 +6,23 @@ HTTP adapter provides an HTTP API for sending messages through the platform.
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| ----------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- |
| SMQ_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter (debug, info, warn, error) | info |
| SMQ_HTTP_ADAPTER_HOST | Service HTTP host | "" |
| SMQ_HTTP_ADAPTER_PORT | Service HTTP port | 80 |
| SMQ_HTTP_ADAPTER_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_HTTP_ADAPTER_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <nats://localhost:4222> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_HTTP_ADAPTER_INSTANCE_ID | Service instance ID | "" |
| Variable | Description | Default |
| ----------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter (debug, info, warn, error) | info |
| SMQ_HTTP_ADAPTER_HOST | Service HTTP host | "" |
| SMQ_HTTP_ADAPTER_PORT | Service HTTP port | 80 |
| SMQ_HTTP_ADAPTER_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_HTTP_ADAPTER_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_HTTP_ADAPTER_INSTANCE_ID | Service instance ID | "" |
## Deployment
@@ -54,7 +54,7 @@ SMQ_CLIENTS_GRPC_TIMEOUT=1s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
SMQ_CLIENTS_GRPC_CLIENT_KEY="" \
SMQ_CLIENTS_GRPC_SERVER_CERTS="" \
SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \
SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \
SMQ_JAEGER_URL=http://localhost:14268/api/traces \
SMQ_JAEGER_TRACE_RATIO=1.0 \
SMQ_SEND_TELEMETRY=true \
+30 -24
View File
@@ -6,28 +6,31 @@ MQTT adapter provides an MQTT API for sending messages through the platform. MQT
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| ----------------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- |
| SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info |
| SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" |
| SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost |
| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 |
| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_ES_URL | Event sourcing URL | <nats://localhost:4222> |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <nats://localhost:4222> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" |
| Variable | Description | Default |
| ----------------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info |
| SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_QOS | MQTT broker QoS | 1 |
| SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" |
| SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost |
| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt |
| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_ES_URL | Event sourcing URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" |
## Deployment
@@ -53,18 +56,21 @@ SMQ_MQTT_ADAPTER_LOG_LEVEL=info \
SMQ_MQTT_ADAPTER_MQTT_PORT=1883 \
SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST=localhost \
SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 \
SMQ_MQTT_ADAPTER_MQTT_QOS=1 \
SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s \
SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK="" \
SMQ_MQTT_ADAPTER_WS_PORT=8080 \
SMQ_MQTT_ADAPTER_WS_TARGET_HOST=localhost \
SMQ_MQTT_ADAPTER_WS_TARGET_PORT=8080 \
SMQ_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt \
SMQ_MQTT_ADAPTER_INSTANCE="" \
SMQ_CLIENTS_GRPC_URL=localhost:7000 \
SMQ_CLIENTS_GRPC_TIMEOUT=1s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
SMQ_CLIENTS_GRPC_CLIENT_KEY="" \
SMQ_CLIENTS_GRPC_SERVER_CERTS="" \
SMQ_ES_URL=nats://localhost:4222 \
SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \
SMQ_ES_URL=amqp://guest:guest@rabbitmq:5672/ \
SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \
SMQ_JAEGER_URL=http://localhost:14268/api/traces \
SMQ_JAEGER_TRACE_RATIO=1.0 \
SMQ_SEND_TELEMETRY=true \
+75
View File
@@ -0,0 +1,75 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"fmt"
"log/slog"
"strings"
"github.com/absmach/supermq/pkg/messaging"
)
// Forwarder specifies MQTT forwarder interface API.
type Forwarder interface {
// Forward subscribes to the Subscriber and
// publishes messages using provided Publisher.
Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error
}
type forwarder struct {
topic string
logger *slog.Logger
}
// NewForwarder returns new Forwarder implementation.
func NewForwarder(topic string, logger *slog.Logger) Forwarder {
return forwarder{
topic: topic,
logger: logger,
}
}
func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error {
subCfg := messaging.SubscriberConfig{
ID: id,
Topic: f.topic,
Handler: handle(ctx, pub, f.logger),
}
return sub.Subscribe(ctx, subCfg)
}
func handle(ctx context.Context, pub messaging.Publisher, logger *slog.Logger) handleFunc {
return func(msg *messaging.Message) error {
if msg.GetProtocol() == protocol {
return nil
}
// Use concatenation instead of fmt.Sprintf for the
// sake of simplicity and performance.
topic := "channels/" + msg.GetChannel() + "/messages"
if msg.GetSubtopic() != "" {
topic = topic + "/" + strings.ReplaceAll(msg.GetSubtopic(), ".", "/")
}
go func() {
if err := pub.Publish(ctx, topic, msg); err != nil {
logger.Warn(fmt.Sprintf("Failed to forward message: %s", err))
}
}()
return nil
}
}
type handleFunc func(msg *messaging.Message) error
func (h handleFunc) Handle(msg *messaging.Message) error {
return h(msg)
}
func (h handleFunc) Cancel() error {
return nil
}
+12
View File
@@ -0,0 +1,12 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package tracing provides tracing instrumentation for SuperMQ MQTT adapter service.
//
// This package provides tracing middleware for SuperMQ MQTT adapter service.
// It can be used to trace incoming requests and add tracing capabilities to
// SuperMQ MQTT adapter service.
//
// For more details about tracing instrumentation for SuperMQ messaging refer
// to the documentation at https://docs.supermq.abstractmachines.fr/tracing/.
package tracing
+63
View File
@@ -0,0 +1,63 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package tracing
import (
"context"
"fmt"
"github.com/absmach/supermq/mqtt"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/server"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const forwardOP = "process"
var _ mqtt.Forwarder = (*forwarderMiddleware)(nil)
type forwarderMiddleware struct {
topic string
forwarder mqtt.Forwarder
tracer trace.Tracer
host server.Config
}
// New creates new mqtt forwarder tracing middleware.
func New(config server.Config, tracer trace.Tracer, forwarder mqtt.Forwarder, topic string) mqtt.Forwarder {
return &forwarderMiddleware{
forwarder: forwarder,
tracer: tracer,
topic: topic,
host: config,
}
}
// Forward traces mqtt forward operations.
func (fm *forwarderMiddleware) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error {
subject := fmt.Sprintf("channels.%s.messages", fm.topic)
spanName := fmt.Sprintf("%s %s", subject, forwardOP)
ctx, span := fm.tracer.Start(ctx,
spanName,
trace.WithAttributes(
attribute.String("messaging.system", "mqtt"),
attribute.Bool("messaging.destination.anonymous", false),
attribute.String("messaging.destination.template", "channels/{channelID}/messages/*"),
attribute.Bool("messaging.destination.temporary", true),
attribute.String("network.protocol.name", "mqtt"),
attribute.String("network.protocol.version", "3.1.1"),
attribute.String("network.transport", "tcp"),
attribute.String("network.type", "ipv4"),
attribute.String("messaging.operation", forwardOP),
attribute.String("messaging.client_id", id),
attribute.String("server.address", fm.host.Host),
attribute.String("server.socket.port", fm.host.Port),
),
)
defer span.End()
return fm.forwarder.Forward(ctx, id, sub, pub)
}
@@ -1,9 +1,9 @@
//go:build rabbitmq
// +build rabbitmq
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
//go:build rabbitmq
// +build rabbitmq
package brokers
import (
+11
View File
@@ -0,0 +1,11 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package mqtt hold the implementation of the Publisher and PubSub
// interfaces for the MQTT messaging system, the internal messaging
// broker of the SuperMQ IoT platform. Due to the practical requirements
// implementation Publisher is created alongside PubSub. The reason for
// this is that Subscriber implementation of MQTT brings the burden of
// additional struct fields which are not used by Publisher. Subscriber
// is not implemented separately because PubSub can be used where Subscriber is needed.
package mqtt
+61
View File
@@ -0,0 +1,61 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"errors"
"time"
"github.com/absmach/supermq/pkg/messaging"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var errPublishTimeout = errors.New("failed to publish due to timeout reached")
var _ messaging.Publisher = (*publisher)(nil)
type publisher struct {
client mqtt.Client
timeout time.Duration
qos uint8
}
// NewPublisher returns a new MQTT message publisher.
func NewPublisher(address, username, password string, qos uint8, timeout time.Duration) (messaging.Publisher, error) {
client, err := newClient(address, username, password, "mqtt-publisher", timeout)
if err != nil {
return nil, err
}
ret := publisher{
client: client,
timeout: timeout,
qos: qos,
}
return ret, nil
}
func (pub publisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error {
if topic == "" {
return ErrEmptyTopic
}
// Publish only the payload and not the whole message.
token := pub.client.Publish(topic, byte(pub.qos), false, msg.GetPayload())
if token.Error() != nil {
return token.Error()
}
if ok := token.WaitTimeout(pub.timeout); !ok {
return errPublishTimeout
}
return nil
}
func (pub publisher) Close() error {
pub.client.Disconnect(uint(pub.timeout))
return nil
}
+235
View File
@@ -0,0 +1,235 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
"github.com/absmach/supermq/pkg/messaging"
mqtt "github.com/eclipse/paho.mqtt.golang"
"google.golang.org/protobuf/proto"
)
var (
// ErrConnect indicates that connection to MQTT broker failed.
ErrConnect = errors.New("failed to connect to MQTT broker")
// errSubscribeTimeout indicates that the subscription failed due to timeout.
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
// errUnsubscribeTimeout indicates that unsubscribe failed due to timeout.
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
// errUnsubscribeDeleteTopic indicates that unsubscribe failed because the topic was deleted.
errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
// ErrNotSubscribed indicates that the topic is not subscribed to.
ErrNotSubscribed = errors.New("not subscribed")
// ErrEmptyTopic indicates the absence of topic.
ErrEmptyTopic = errors.New("empty topic")
// ErrEmptyID indicates the absence of ID.
ErrEmptyID = errors.New("empty ID")
)
var _ messaging.PubSub = (*pubsub)(nil)
type subscription struct {
client mqtt.Client
topics []string
cancel func() error
}
type pubsub struct {
publisher
logger *slog.Logger
mu sync.RWMutex
address string
username string
password string
timeout time.Duration
subscriptions map[string]subscription
}
// NewPubSub returns MQTT message publisher/subscriber.
func NewPubSub(url, username, password string, qos uint8, timeout time.Duration, logger *slog.Logger) (messaging.PubSub, error) {
client, err := newClient(url, username, password, "mqtt-publisher", timeout)
if err != nil {
return nil, err
}
ret := &pubsub{
publisher: publisher{
client: client,
timeout: timeout,
qos: qos,
},
address: url,
username: username,
password: password,
timeout: timeout,
logger: logger,
subscriptions: make(map[string]subscription),
}
return ret, nil
}
func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error {
if cfg.ID == "" {
return ErrEmptyID
}
if cfg.Topic == "" {
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
s, ok := ps.subscriptions[cfg.ID]
// If the client exists, check if it's subscribed to the topic and unsubscribe if needed.
switch ok {
case true:
if ok := s.contains(cfg.Topic); ok {
if err := s.unsubscribe(cfg.Topic, ps.timeout); err != nil {
return err
}
}
default:
client, err := newClient(ps.address, ps.username, ps.password, cfg.ID, ps.timeout)
if err != nil {
return err
}
s = subscription{
client: client,
topics: []string{},
cancel: cfg.Handler.Cancel,
}
}
s.topics = append(s.topics, cfg.Topic)
ps.subscriptions[cfg.ID] = s
token := s.client.Subscribe(cfg.Topic, byte(ps.qos), ps.mqttHandler(cfg.Handler))
if token.Error() != nil {
return token.Error()
}
if ok := token.WaitTimeout(ps.timeout); !ok {
return errSubscribeTimeout
}
return nil
}
func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error {
if id == "" {
return ErrEmptyID
}
if topic == "" {
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
s, ok := ps.subscriptions[id]
if !ok || !s.contains(topic) {
return ErrNotSubscribed
}
if err := s.unsubscribe(topic, ps.timeout); err != nil {
return err
}
ps.subscriptions[id] = s
if len(s.topics) == 0 {
delete(ps.subscriptions, id)
}
return nil
}
func (s *subscription) unsubscribe(topic string, timeout time.Duration) error {
if s.cancel != nil {
if err := s.cancel(); err != nil {
return err
}
}
token := s.client.Unsubscribe(topic)
if token.Error() != nil {
return token.Error()
}
if ok := token.WaitTimeout(timeout); !ok {
return errUnsubscribeTimeout
}
if ok := s.delete(topic); !ok {
return errUnsubscribeDeleteTopic
}
return token.Error()
}
func newClient(address, username, password, id string, timeout time.Duration) (mqtt.Client, error) {
opts := mqtt.NewClientOptions().
SetUsername(username).
SetPassword(password).
SetConnectRetry(true).
SetAutoReconnect(true).
AddBroker(address).
SetClientID(id)
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Error() != nil {
return nil, token.Error()
}
if ok := token.WaitTimeout(timeout); !ok {
return nil, ErrConnect
}
return client, nil
}
func (ps *pubsub) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler {
return func(_ mqtt.Client, m mqtt.Message) {
var msg messaging.Message
if err := proto.Unmarshal(m.Payload(), &msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Failed to unmarshal received message: %s", err))
return
}
if err := h.Handle(&msg); err != nil {
ps.logger.Warn(fmt.Sprintf("Failed to handle SuperMQ message: %s", err))
}
}
}
// Contains checks if a topic is present.
func (s subscription) contains(topic string) bool {
return s.indexOf(topic) != -1
}
// Finds the index of an item in the topics.
func (s subscription) indexOf(element string) int {
for k, v := range s.topics {
if element == v {
return k
}
}
return -1
}
// Deletes a topic from the slice.
func (s *subscription) delete(topic string) bool {
index := s.indexOf(topic)
if index == -1 {
return false
}
topics := make([]string, len(s.topics)-1)
copy(topics[:index], s.topics[:index])
copy(topics[index:], s.topics[index+1:])
s.topics = topics
return true
}
+474
View File
@@ -0,0 +1,474 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/absmach/supermq/pkg/messaging"
mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)
const (
topic = "topic"
chansPrefix = "channels"
channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b"
subtopic = "engine"
tokenTimeout = 100 * time.Millisecond
)
var data = []byte("payload")
// ErrFailedHandleMessage indicates that the message couldn't be handled.
var errFailedHandleMessage = errors.New("failed to handle supermq message")
func TestPublisher(t *testing.T) {
msgChan := make(chan []byte)
// Subscribing with topic, and with subtopic, so that we can publish messages.
client, err := newClient(address, "clientID1", brokerTimeout)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
token := client.Subscribe(topic, qos, func(_ mqtt.Client, m mqtt.Message) {
msgChan <- m.Payload()
})
if ok := token.WaitTimeout(tokenTimeout); !ok {
assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", topic))
}
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
token = client.Subscribe(fmt.Sprintf("%s.%s", topic, subtopic), qos, func(_ mqtt.Client, m mqtt.Message) {
msgChan <- m.Payload()
})
if ok := token.WaitTimeout(tokenTimeout); !ok {
assert.Fail(t, fmt.Sprintf("failed to subscribe to topic %s", fmt.Sprintf("%s.%s", topic, subtopic)))
}
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
t.Cleanup(func() {
token := client.Unsubscribe(topic, fmt.Sprintf("%s.%s", topic, subtopic))
token.WaitTimeout(tokenTimeout)
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
client.Disconnect(100)
})
// Test publish with an empty topic.
err = pubsub.Publish(context.TODO(), "", &messaging.Message{Payload: data})
assert.Equal(t, err, mqttpubsub.ErrEmptyTopic, fmt.Sprintf("Publish with empty topic: expected: %s, got: %s", mqttpubsub.ErrEmptyTopic, err))
cases := []struct {
desc string
channel string
subtopic string
payload []byte
}{
{
desc: "publish message with nil payload",
payload: nil,
},
{
desc: "publish message with string payload",
payload: data,
},
{
desc: "publish message with channel",
payload: data,
channel: channel,
},
{
desc: "publish message with subtopic",
payload: data,
subtopic: subtopic,
},
{
desc: "publish message with channel and subtopic",
payload: data,
channel: channel,
subtopic: subtopic,
},
}
for _, tc := range cases {
expectedMsg := messaging.Message{
Publisher: "clientID11",
Channel: tc.channel,
Subtopic: tc.subtopic,
Payload: tc.payload,
}
err := pubsub.Publish(context.TODO(), topic, &expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err))
data, err := proto.Marshal(&expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err))
receivedMsg := <-msgChan
if tc.payload != nil {
assert.Equal(t, expectedMsg.GetPayload(), receivedMsg, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, data, receivedMsg))
}
}
}
func TestSubscribe(t *testing.T) {
msgChan := make(chan *messaging.Message)
// Creating client to Publish messages to subscribed topic.
client, err := newClient(address, "supermq", brokerTimeout)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
t.Cleanup(func() {
client.Unsubscribe()
client.Disconnect(100)
})
cases := []struct {
desc string
topic string
clientID string
err error
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: topic,
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: topic,
clientID: "clientid2",
err: nil,
handler: handler{false, "clientid2", msgChan},
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: topic,
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s", topic, subtopic),
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s", topic, subtopic),
clientID: "clientid1",
err: nil,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to an empty topic with an ID",
topic: "",
clientID: "clientid1",
err: mqttpubsub.ErrEmptyTopic,
handler: handler{false, "clientid1", msgChan},
},
{
desc: "Subscribe to a topic with empty id",
topic: topic,
clientID: "",
err: mqttpubsub.ErrEmptyID,
handler: handler{false, "", msgChan},
},
}
for _, tc := range cases {
subCfg := messaging.SubscriberConfig{
ID: tc.clientID,
Topic: tc.topic,
Handler: tc.handler,
}
err = pubsub.Subscribe(context.TODO(), subCfg)
assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err))
if tc.err == nil {
expectedMsg := messaging.Message{
Publisher: "clientID1",
Channel: channel,
Subtopic: subtopic,
Payload: data,
}
data, err := proto.Marshal(&expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err))
token := client.Publish(tc.topic, qos, false, data)
token.WaitTimeout(tokenTimeout)
assert.Nil(t, token.Error(), fmt.Sprintf("got unexpected error: %s", token.Error()))
receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
}
}
}
func TestPubSub(t *testing.T) {
msgChan := make(chan *messaging.Message)
cases := []struct {
desc string
topic string
clientID string
err error
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: topic,
clientID: "clientid7",
err: nil,
handler: handler{false, "clientid7", msgChan},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: topic,
clientID: "clientid8",
err: nil,
handler: handler{false, "clientid8", msgChan},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s", topic, subtopic),
clientID: "clientid7",
err: nil,
handler: handler{false, "clientid7", msgChan},
},
{
desc: "Subscribe to an empty topic with an ID",
topic: "",
clientID: "clientid7",
err: mqttpubsub.ErrEmptyTopic,
handler: handler{false, "clientid7", msgChan},
},
{
desc: "Subscribe to a topic with empty id",
topic: topic,
clientID: "",
err: mqttpubsub.ErrEmptyID,
handler: handler{false, "", msgChan},
},
}
for _, tc := range cases {
subCfg := messaging.SubscriberConfig{
ID: tc.clientID,
Topic: tc.topic,
Handler: tc.handler,
}
err := pubsub.Subscribe(context.TODO(), subCfg)
assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err))
if tc.err == nil {
// Use pubsub to subscribe to a topic, and then publish messages to that topic.
expectedMsg := messaging.Message{
Publisher: "clientID",
Channel: channel,
Subtopic: subtopic,
Payload: data,
}
data, err := proto.Marshal(&expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: failed to serialize protobuf error: %s\n", tc.desc, err))
msg := messaging.Message{
Payload: data,
}
// Publish message, and then receive it on message channel.
err = pubsub.Publish(context.TODO(), topic, &msg)
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err))
receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Channel, receivedMsg.Channel, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Created, receivedMsg.Created, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Protocol, receivedMsg.Protocol, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Publisher, receivedMsg.Publisher, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg))
}
}
}
func TestUnsubscribe(t *testing.T) {
msgChan := make(chan *messaging.Message)
cases := []struct {
desc string
topic string
clientID string
err error
subscribe bool // True for subscribe and false for unsubscribe.
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: true,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Subscribe to the same topic with a different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: true,
handler: handler{false, "clientid9", msgChan},
},
{
desc: "Unsubscribe from a topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid4",
err: nil,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from same topic with different ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid9",
err: nil,
subscribe: false,
handler: handler{false, "clientid9", msgChan},
},
{
desc: "Unsubscribe from a non-existent topic with an ID",
topic: "h",
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: true,
handler: handler{false, "clientidd4", msgChan},
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd4",
err: nil,
subscribe: false,
handler: handler{false, "clientidd4", msgChan},
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientid4",
err: mqttpubsub.ErrNotSubscribed,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from an empty topic with an ID",
topic: "",
clientID: "clientid4",
err: mqttpubsub.ErrEmptyTopic,
subscribe: false,
handler: handler{false, "clientid4", msgChan},
},
{
desc: "Unsubscribe from a topic with empty ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "",
err: mqttpubsub.ErrEmptyID,
subscribe: false,
handler: handler{false, "", msgChan},
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid55",
err: nil,
subscribe: true,
handler: handler{true, "clientid5", msgChan},
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
handler: handler{true, "clientid5", msgChan},
},
{
desc: "Subscribe to a new topic with subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: nil,
subscribe: true,
handler: handler{true, "clientid5", msgChan},
},
{
desc: "Unsubscribe from a topic with subtopic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic+"2", subtopic),
clientID: "clientid55",
err: errFailedHandleMessage,
subscribe: false,
handler: handler{true, "clientid5", msgChan},
},
}
for _, tc := range cases {
subCfg := messaging.SubscriberConfig{
ID: tc.clientID,
Topic: tc.topic,
Handler: tc.handler,
}
switch tc.subscribe {
case true:
err := pubsub.Subscribe(context.TODO(), subCfg)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err))
default:
err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic)
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err))
}
}
}
type handler struct {
fail bool
publisher string
msgChan chan *messaging.Message
}
func (h handler) Handle(msg *messaging.Message) error {
if msg.GetPublisher() != h.publisher {
h.msgChan <- msg
}
return nil
}
func (h handler) Cancel() error {
if h.fail {
return errFailedHandleMessage
}
return nil
}
+121
View File
@@ -0,0 +1,121 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package mqtt_test
import (
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"syscall"
"testing"
"time"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/messaging"
mqttpubsub "github.com/absmach/supermq/pkg/messaging/mqtt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
)
var (
pubsub messaging.PubSub
logger *slog.Logger
address string
)
const (
username = "supermq-mqtt"
qos = 2
port = "1883/tcp"
brokerTimeout = 30 * time.Second
poolMaxWait = 120 * time.Second
)
func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "eclipse-mosquitto",
Tag: "1.6.15",
}, func(config *docker.HostConfig) {
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{Name: "no"}
})
if err != nil {
log.Fatalf("Could not start container: %s", err)
}
handleInterrupt(pool, container)
address = fmt.Sprintf("%s:%s", "localhost", container.GetPort(port))
pool.MaxWait = poolMaxWait
logger, err = smqlog.New(os.Stdout, "debug")
if err != nil {
log.Fatal(err.Error())
}
if err := pool.Retry(func() error {
pubsub, err = mqttpubsub.NewPubSub(address, "supermq", "", 2, brokerTimeout, logger)
return err
}); err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}
code := m.Run()
if err := pool.Purge(container); err != nil {
log.Fatalf("Could not purge container: %s", err)
}
os.Exit(code)
defer func() {
err = pubsub.Close()
if err != nil {
log.Fatal(err.Error())
}
}()
}
func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
if err := pool.Purge(container); err != nil {
log.Fatalf("Could not purge container: %s", err)
}
os.Exit(0)
}()
}
func newClient(address, id string, timeout time.Duration) (mqtt.Client, error) {
opts := mqtt.NewClientOptions().
SetUsername(username).
AddBroker(address).
SetClientID(id)
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Error() != nil {
return nil, token.Error()
}
ok := token.WaitTimeout(timeout)
if !ok {
return nil, mqttpubsub.ErrConnect
}
if token.Error() != nil {
return nil, token.Error()
}
return client, nil
}
+18 -18
View File
@@ -6,23 +6,23 @@ WebSocket adapter provides a [WebSocket](https://en.wikipedia.org/wiki/WebSocket
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| ------------------------------- | ----------------------------------------------------------------------------------- | --------------------------------- |
| SMQ_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter (debug, info, warn, error) | info |
| SMQ_WS_ADAPTER_HTTP_HOST | Service WS host | "" |
| SMQ_WS_ADAPTER_HTTP_PORT | Service WS port | 8190 |
| SMQ_WS_ADAPTER_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_WS_ADAPTER_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <nats://localhost:4222> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_WS_ADAPTER_INSTANCE_ID | Service instance ID | "" |
| Variable | Description | Default |
| ------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter (debug, info, warn, error) | info |
| SMQ_WS_ADAPTER_HTTP_HOST | Service WS host | "" |
| SMQ_WS_ADAPTER_HTTP_PORT | Service WS port | 8190 |
| SMQ_WS_ADAPTER_HTTP_SERVER_CERT | Path to the PEM encoded server certificate file | "" |
| SMQ_WS_ADAPTER_HTTP_SERVER_KEY | Path to the PEM encoded server key file | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_WS_ADAPTER_INSTANCE_ID | Service instance ID | "" |
## Deployment
@@ -54,7 +54,7 @@ SMQ_CLIENTS_GRPC_TIMEOUT=1s \
SMQ_CLIENTS_GRPC_CLIENT_CERT="" \
SMQ_CLIENTS_GRPC_CLIENT_KEY="" \
SMQ_CLIENTS_GRPC_SERVER_CERTS="" \
SMQ_MESSAGE_BROKER_URL=nats://localhost:4222 \
SMQ_MESSAGE_BROKER_URL=amqp://guest:guest@rabbitmq:5672/ \
SMQ_JAEGER_URL=http://localhost:14268/api/traces \
SMQ_JAEGER_TRACE_RATIO=1.0 \
SMQ_SEND_TELEMETRY=true \