SMQ-3361 - Enable TLS and mTLS termination on MQTT adapter (#3360)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2026-02-23 18:14:31 +03:00
committed by GitHub
parent c7fad0d2ca
commit a871fa926e
4 changed files with 98 additions and 35 deletions
+29 -7
View File
@@ -6,6 +6,7 @@ package main
import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
@@ -22,6 +23,7 @@ import (
mgatemqtt "github.com/absmach/mgate/pkg/mqtt"
"github.com/absmach/mgate/pkg/mqtt/websocket"
"github.com/absmach/mgate/pkg/session"
mgtls "github.com/absmach/mgate/pkg/tls"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/mqtt"
@@ -51,6 +53,7 @@ const (
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
envPrefixMQTT = "SMQ_MQTT_ADAPTER_"
wsPathPrefix = "/mqtt"
)
@@ -122,6 +125,13 @@ func main() {
Port: cfg.HTTPTargetPort,
}
tlsCfg, err := mgtls.NewConfig(env.Options{Prefix: envPrefixMQTT})
if err != nil {
logger.Error(fmt.Sprintf("Failed to load TLS config: %s", err))
exitCode = 1
return
}
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
@@ -261,12 +271,12 @@ func main() {
}
logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort))
g.Go(func() error {
return proxyMQTT(ctx, cfg, logger, h, beforeHandler, afterHandler)
return proxyMQTT(ctx, cfg, tlsCfg, logger, h, beforeHandler, afterHandler)
})
logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPPort))
g.Go(func() error {
return proxyWS(ctx, cfg, logger, h, afterHandler)
return proxyWS(ctx, cfg, tlsCfg, logger, h, afterHandler)
})
g.Go(func() error {
@@ -278,17 +288,24 @@ func main() {
}
}
func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, beforeHandler, afterHandler session.Interceptor) error {
func proxyMQTT(ctx context.Context, cfg config, tlsCfg mgtls.Config, logger *slog.Logger, sessionHandler session.Handler, beforeHandler, afterHandler session.Interceptor) error {
var err error
config := mgate.Config{
Port: cfg.MQTTPort,
TargetHost: cfg.MQTTTargetHost,
TargetPort: cfg.MQTTTargetPort,
}
mproxy := mgatemqtt.New(config, sessionHandler, beforeHandler, afterHandler, logger)
errCh := make(chan error)
config.TLSConfig, err = mgtls.LoadTLSConfig(&tlsCfg, &tls.Config{})
if err != nil {
return err
}
mgate := mgatemqtt.New(config, sessionHandler, beforeHandler, afterHandler, logger)
go func() {
errCh <- mproxy.Listen(ctx)
errCh <- mgate.Listen(ctx)
}()
select {
@@ -300,7 +317,8 @@ func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHand
}
}
func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
func proxyWS(ctx context.Context, cfg config, tlsCfg mgtls.Config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
var err error
config := mgate.Config{
Port: cfg.HTTPPort,
TargetProtocol: "ws",
@@ -309,6 +327,10 @@ func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandle
TargetPath: cfg.HTTPTargetPath,
PathPrefix: wsPathPrefix,
}
config.TLSConfig, err = mgtls.LoadTLSConfig(&tlsCfg, &tls.Config{})
if err != nil {
return err
}
wp := websocket.New(config, sessionHandler, nil, interceptor, logger)
http.HandleFunc(wsPathPrefix, wp.ServeHTTP)
+6
View File
@@ -418,6 +418,12 @@ SMQ_MQTT_ADAPTER_ES_DB=0
SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS=200000
SMQ_MQTT_ADAPTER_CACHE_MAX_COST=1048576
SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS=64
SMQ_MQTT_ADAPTER_CERT_FILE=
SMQ_MQTT_ADAPTER_KEY_FILE=
SMQ_MQTT_ADAPTER_SERVER_CA_FILE=
SMQ_MQTT_ADAPTER_CLIENT_CA_FILE=
SMQ_MQTT_ADAPTER_CERT_VERIFICATION_METHODS=
SMQ_MQTT_ADAPTER_OCSP_RESPONDER_URL=
### CoAP
## If enabled run make all inside docker/ssl directory to generate the DTLS certs
+27
View File
@@ -1234,6 +1234,12 @@ services:
SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS: ${SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS}
SMQ_MQTT_ADAPTER_CACHE_MAX_COST: ${SMQ_MQTT_ADAPTER_CACHE_MAX_COST}
SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS: ${SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS}
SMQ_MQTT_ADAPTER_CERT_FILE: ${SMQ_MQTT_ADAPTER_CERT_FILE:+/mqtt-adapter.crt}
SMQ_MQTT_ADAPTER_KEY_FILE: ${SMQ_MQTT_ADAPTER_KEY_FILE:+/mqtt-adapter.key}
SMQ_MQTT_ADAPTER_SERVER_CA_FILE: ${SMQ_MQTT_ADAPTER_SERVER_CA_FILE:+/mqtt-adapter-server-ca.crt}
SMQ_MQTT_ADAPTER_CLIENT_CA_FILE: ${SMQ_MQTT_ADAPTER_CLIENT_CA_FILE:+/mqtt-adapter-client-ca.crt}
SMQ_MQTT_ADAPTER_CERT_VERIFICATION_METHODS: ${SMQ_MQTT_ADAPTER_CERT_VERIFICATION_METHODS}
SMQ_MQTT_ADAPTER_OCSP_RESPONDER_URL: ${SMQ_MQTT_ADAPTER_OCSP_RESPONDER_URL}
SMQ_ES_URL: ${SMQ_ES_URL}
SMQ_CLIENTS_GRPC_URL: ${SMQ_CLIENTS_GRPC_URL}
SMQ_CLIENTS_GRPC_TIMEOUT: ${SMQ_CLIENTS_GRPC_TIMEOUT}
@@ -1257,6 +1263,27 @@ services:
networks:
- supermq-base-net
volumes:
# TLS certificate for MQTT
- type: bind
source: ${SMQ_MQTT_ADAPTER_CERT_FILE:-ssl/certs/dummy/server_cert}
target: /mqtt-adapter${SMQ_MQTT_ADAPTER_CERT_FILE:+.crt}
bind:
create_host_path: true
- type: bind
source: ${SMQ_MQTT_ADAPTER_KEY_FILE:-ssl/certs/dummy/server_key}
target: /mqtt-adapter${SMQ_MQTT_ADAPTER_KEY_FILE:+.key}
bind:
create_host_path: true
- type: bind
source: ${SMQ_MQTT_ADAPTER_SERVER_CA_FILE:-ssl/certs/dummy/server_ca}
target: /mqtt-adapter-server-ca${SMQ_MQTT_ADAPTER_SERVER_CA_FILE:+.crt}
bind:
create_host_path: true
- type: bind
source: ${SMQ_MQTT_ADAPTER_CLIENT_CA_FILE:-ssl/certs/dummy/client_ca}
target: /mqtt-adapter-client-ca${SMQ_MQTT_ADAPTER_CLIENT_CA_FILE:+.crt}
bind:
create_host_path: true
# Clients gRPC mTLS client certificates
- type: bind
source: ${SMQ_CLIENTS_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
+36 -28
View File
@@ -6,34 +6,40 @@ MQTT adapter provides an MQTT API for sending messages through the platform. MQT
The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.
| Variable | Description | Default |
| ----------------------------------------- | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info |
| SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_QOS | MQTT broker QoS | 1 |
| SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" |
| SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost |
| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt |
| SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS | Number of cache counters to keep that hold access frequency information | 200000 |
| SMQ_MQTT_ADAPTER_CACHE_MAX_COST | Maximum size of the cache(in bytes) | 1048576 |
| SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS | Number of cache `Get` buffers | 64 |
| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_ES_URL | Event sourcing URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" |
| Variable | Description | Default |
| ------------------------------------------ | ----------------------------------------------------------------------------------- | ----------------------------------- |
| SMQ_MQTT_ADAPTER_LOG_LEVEL | Log level for the MQTT Adapter (debug, info, warn, error) | info |
| SMQ_MQTT_ADAPTER_MQTT_PORT | mProxy port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HOST | MQTT broker host | localhost |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_PORT | MQTT broker port | 1883 |
| SMQ_MQTT_ADAPTER_MQTT_QOS | MQTT broker QoS | 1 |
| SMQ_MQTT_ADAPTER_FORWARDER_TIMEOUT | MQTT forwarder for multiprotocol communication timeout | 30s |
| SMQ_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK | URL of broker health check | "" |
| SMQ_MQTT_ADAPTER_WS_PORT | mProxy MQTT over WS port | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_HOST | MQTT broker host for MQTT over WS | localhost |
| SMQ_MQTT_ADAPTER_WS_TARGET_PORT | MQTT broker port for MQTT over WS | 8080 |
| SMQ_MQTT_ADAPTER_WS_TARGET_PATH | MQTT broker MQTT over WS path | /mqtt |
| SMQ_MQTT_ADAPTER_CACHE_NUM_COUNTERS | Number of cache counters to keep that hold access frequency information | 200000 |
| SMQ_MQTT_ADAPTER_CACHE_MAX_COST | Maximum size of the cache(in bytes) | 1048576 |
| SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS | Number of cache `Get` buffers | 64 |
| SMQ_MQTT_ADAPTER_INSTANCE | Instance name for MQTT adapter | "" |
| SMQ_CLIENTS_GRPC_URL | Clients service Auth gRPC URL | <localhost:7000> |
| SMQ_CLIENTS_GRPC_TIMEOUT | Clients service Auth gRPC request timeout in seconds | 1s |
| SMQ_CLIENTS_GRPC_CLIENT_CERT | Path to the PEM encoded clients service Auth gRPC client certificate file | "" |
| SMQ_CLIENTS_GRPC_CLIENT_KEY | Path to the PEM encoded clients service Auth gRPC client key file | "" |
| SMQ_CLIENTS_GRPC_SERVER_CERTS | Path to the PEM encoded clients server Auth gRPC server trusted CA certificate file | "" |
| SMQ_ES_URL | Event sourcing URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_MESSAGE_BROKER_URL | Message broker instance URL | <amqp://guest:guest@rabbitmq:5672/> |
| SMQ_JAEGER_URL | Jaeger server URL | <http://localhost:4318/v1/traces> |
| SMQ_JAEGER_TRACE_RATIO | Jaeger sampling ratio | 1.0 |
| SMQ_SEND_TELEMETRY | Send telemetry to supermq call home server | true |
| SMQ_MQTT_ADAPTER_INSTANCE_ID | Service instance ID | "" |
| SMQ_MQTT_ADAPTER_CERT_FILE | Path to the PEM encoded TLS certificate file for MQTT adapter | "" |
| SMQ_MQTT_ADAPTER_KEY_FILE | Path to the PEM encoded TLS key file for MQTT adapter | "" |
| SMQ_MQTT_ADAPTER_SERVER_CA_FILE | Path to the PEM encoded server CA certificate file for MQTT adapter | "" |
| SMQ_MQTT_ADAPTER_CLIENT_CA_FILE | Path to the PEM encoded client CA certificate file for MQTT adapter | "" |
| SMQ_MQTT_ADAPTER_OCSP_RESPONDER_URL | URL of the OCSP responder for MQTT adapter | "" |
| SMQ_MQTT_ADAPTER_CERT_VERIFICATION_METHODS | Methods for certificate verification (e.g., ocsp) | "" |
## Deployment
@@ -86,4 +92,6 @@ $GOBIN/supermq-mqtt
Setting `SMQ_CLIENTS_GRPC_CLIENT_CERT` and `SMQ_CLIENTS_GRPC_CLIENT_KEY` will enable TLS against the clients service. The service expects a file in PEM format for both the certificate and the key. Setting `SMQ_CLIENTS_GRPC_SERVER_CERTS` will enable TLS against the clients service trusting only those CAs that are provided. The service expects a file in PEM format of trusted CAs.
Setting `SMQ_MQTT_ADAPTER_CERT_FILE`, `SMQ_MQTT_ADAPTER_KEY_FILE`, and `SMQ_MQTT_ADAPTER_SERVER_CA_FILE` will enable TLS for incoming MQTT connections. The service expects a file in PEM format for both the certificate and the key. The service expects a file in PEM format of trusted CAs. Setting `SMQ_MQTT_ADAPTER_CLIENT_CA_FILE` will enable client certificate verification for incoming MQTT connections trusting only those CAs that are provided. The service expects a file in PEM format of trusted CAs. Setting `SMQ_MQTT_ADAPTER_CERT_VERIFICATION_METHODS` to "ocsp" will enable OCSP verification for incoming MQTT connections. Setting `SMQ_MQTT_ADAPTER_OCSP_RESPONDER_URL` will set the OCSP responder URL for OCSP verification.
For more information about service capabilities and its usage, please check out the API documentation [API](https://github.com/absmach/supermq/blob/main/api/asyncapi/mqtt.yaml).