SMQ-3137 - Proxy CoAP using mgate (#2222)

Signed-off-by: Felix Gateru <felix.gateru@gmail.com>
This commit is contained in:
Felix Gateru
2025-10-24 18:55:02 +03:00
committed by GitHub
parent 64b4497ad4
commit dc0df1c955
8 changed files with 282 additions and 54 deletions
+64 -2
View File
@@ -8,10 +8,15 @@ import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/mgate"
mgatecoap "github.com/absmach/mgate/pkg/coap"
"github.com/absmach/mgate/pkg/session"
mgtls "github.com/absmach/mgate/pkg/tls"
"github.com/absmach/supermq"
"github.com/absmach/supermq/coap"
httpapi "github.com/absmach/supermq/coap/api"
@@ -30,6 +35,7 @@ import (
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/pion/dtls/v3"
"golang.org/x/sync/errgroup"
)
@@ -37,12 +43,15 @@ const (
svcName = "coap_adapter"
envPrefix = "SMQ_COAP_ADAPTER_"
envPrefixHTTP = "SMQ_COAP_ADAPTER_HTTP_"
envPrefixDTLS = "SMQ_COAP_ADAPTER_SERVER_"
envPrefixCache = "SMQ_COAP_CACHE_"
envPrefixClients = "SMQ_CLIENTS_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
defSvcHTTPPort = "5683"
defSvcCoAPPort = "5683"
targetProtocol = "coap"
targetCoapPort = "5682"
)
type config struct {
@@ -94,6 +103,13 @@ func main() {
return
}
dtlsCfg, err := mgtls.NewConfig(env.Options{Prefix: envPrefixDTLS})
if err != nil {
logger.Error(fmt.Sprintf("failed to load %s DTLS configuration : %s", svcName, err))
exitCode = 1
return
}
cacheConfig := messaging.CacheConfig{}
if err := env.ParseWithOptions(&cacheConfig, env.Options{Prefix: envPrefixCache}); err != nil {
logger.Error(fmt.Sprintf("failed to load cache configuration : %s", err))
@@ -196,7 +212,7 @@ func main() {
exitCode = 1
return
}
cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, httpapi.MakeCoAPHandler(svc, channelsClient, parser, logger), logger)
cs := coapserver.NewServer(ctx, cancel, svcName, server.Config{Host: coapServerConfig.Host, Port: targetCoapPort}, httpapi.MakeCoAPHandler(svc, channelsClient, parser, logger), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, supermq.Version, logger, cancel)
@@ -207,7 +223,11 @@ func main() {
return hs.Start()
})
g.Go(func() error {
return cs.Start()
g.Go(func() error {
return cs.Start()
})
handler := coap.NewHandler(logger, clientsClient, channelsClient, parser)
return proxyCoAP(ctx, coapServerConfig, dtlsCfg, handler, logger)
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs, cs)
@@ -217,3 +237,45 @@ func main() {
logger.Error(fmt.Sprintf("CoAP adapter service terminated: %s", err))
}
}
func proxyCoAP(ctx context.Context, cfg server.Config, dtlsCfg mgtls.Config, handler session.Handler, logger *slog.Logger) error {
var err error
config := mgate.Config{
Host: "",
Port: cfg.Port,
TargetProtocol: targetProtocol,
TargetHost: cfg.Host,
TargetPort: targetCoapPort,
}
mg := mgatecoap.NewProxy(config, handler, logger)
errCh := make(chan error)
config.DTLSConfig, err = mgtls.LoadTLSConfig(&dtlsCfg, &dtls.Config{})
if err != nil {
return err
}
switch {
case config.DTLSConfig != nil:
dltsCfg := config
mgDtls := mgatecoap.NewProxy(dltsCfg, handler, logger)
logger.Info(fmt.Sprintf("Starting COAP with DTLS proxy on port %s", cfg.Port))
go func() {
errCh <- mgDtls.Listen(ctx)
}()
default:
logger.Info(fmt.Sprintf("Starting COAP without DTLS proxy on port %s", cfg.Port))
go func() {
errCh <- mg.Listen(ctx)
}()
}
select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy COAP shutdown at %s:%s", config.Host, config.Port))
return nil
case err := <-errCh:
return err
}
}
-43
View File
@@ -73,21 +73,6 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg *messagi
if topicType == messaging.HealthType {
return nil
}
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
DomainId: msg.GetDomain(),
ClientId: authnRes.GetId(),
ClientType: policies.ClientType,
Type: uint32(connections.Publish),
ChannelId: msg.GetChannel(),
})
if err != nil {
return errors.Wrap(svcerr.ErrAuthorization, err)
}
if !authzRes.Authorized {
return svcerr.ErrAuthorization
}
msg.Publisher = authnRes.GetId()
return svc.pubsub.Publish(ctx, messaging.EncodeMessageTopic(msg), msg)
@@ -105,19 +90,6 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, domainID, chanID,
}
clientID := authnRes.GetId()
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
DomainId: domainID,
ClientId: clientID,
ClientType: policies.ClientType,
Type: uint32(connections.Subscribe),
ChannelId: chanID,
})
if err != nil {
return errors.Wrap(svcerr.ErrAuthorization, err)
}
if !authzRes.Authorized {
return svcerr.ErrAuthorization
}
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
authzc := newAuthzClient(clientID, domainID, chanID, subtopic, svc.channels, c)
@@ -140,21 +112,6 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, domainID, chanI
if !authnRes.Authenticated {
return svcerr.ErrAuthentication
}
authzRes, err := svc.channels.Authorize(ctx, &grpcChannelsV1.AuthzReq{
DomainId: domainID,
ClientId: authnRes.GetId(),
ClientType: policies.ClientType,
Type: uint32(connections.Subscribe),
ChannelId: chanID,
})
if err != nil {
return errors.Wrap(svcerr.ErrAuthorization, err)
}
if !authzRes.Authorized {
return svcerr.ErrAuthorization
}
subject := messaging.EncodeTopic(domainID, chanID, subtopic)
return svc.pubsub.Unsubscribe(ctx, token, subject)
+183
View File
@@ -0,0 +1,183 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package coap
import (
"context"
"fmt"
"log/slog"
"net/http"
"strings"
mgate "github.com/absmach/mgate/pkg/coap"
"github.com/absmach/mgate/pkg/session"
grpcChannelsV1 "github.com/absmach/supermq/api/grpc/channels/v1"
grpcClientsV1 "github.com/absmach/supermq/api/grpc/clients/v1"
smqauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/connections"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/messaging"
"github.com/absmach/supermq/pkg/policies"
)
var _ session.Handler = (*handler)(nil)
// Log message formats.
const (
subscribedInfoFmt = "subscribed with client_id %s to topics %s"
publishedInfoFmt = "published with client_id %s to the topic %s"
)
// Error wrappers for COAP errors.
var (
errClientNotInitialized = errors.New("client is not initialized")
errMissingTopicPub = errors.New("failed to publish due to missing topic")
errMissingTopicSub = errors.New("failed to subscribe due to missing topic")
errFailedPublish = errors.New("failed to publish")
)
type handler struct {
clients grpcClientsV1.ClientsServiceClient
channels grpcChannelsV1.ChannelsServiceClient
logger *slog.Logger
parser messaging.TopicParser
}
// NewHandler creates new Handler entity.
func NewHandler(logger *slog.Logger, clients grpcClientsV1.ClientsServiceClient, channels grpcChannelsV1.ChannelsServiceClient, parser messaging.TopicParser) session.Handler {
return &handler{
logger: logger,
clients: clients,
channels: channels,
parser: parser,
}
}
// AuthConnect is called on device connection,
// prior forwarding to the coap server.
func (h *handler) AuthConnect(ctx context.Context) error {
return nil
}
// AuthPublish is called on device publish,
// prior forwarding to the coap server.
func (h *handler) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error {
if topic == nil {
return errMissingTopicPub
}
s, ok := session.FromContext(ctx)
if !ok {
return errClientNotInitialized
}
domainID, channelID, _, topicType, err := h.parser.ParsePublishTopic(ctx, *topic, true)
if err != nil {
return mgate.NewCOAPProxyError(http.StatusBadRequest, errors.Wrap(errFailedPublish, err))
}
clientID, err := h.authAccess(ctx, string(s.Password), domainID, channelID, connections.Publish, topicType)
if err != nil {
return err
}
s.Username = clientID
return nil
}
// AuthSubscribe is called on device publish,
// prior forwarding to the COAP broker.
func (h *handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
return errClientNotInitialized
}
if topics == nil || *topics == nil {
return errMissingTopicSub
}
for _, topic := range *topics {
domainID, channelID, _, topicType, err := h.parser.ParseSubscribeTopic(ctx, topic, true)
if err != nil {
return err
}
if _, err := h.authAccess(ctx, string(s.Password), domainID, channelID, connections.Subscribe, topicType); err != nil {
return err
}
}
return nil
}
// Connect - after client successfully connected.
func (h *handler) Connect(ctx context.Context) error {
return nil
}
// Publish - after client successfully published.
func (h *handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
if !ok {
return errClientNotInitialized
}
if len(*payload) == 0 {
h.logger.Warn("Empty payload, not publishing to broker", slog.String("client_id", s.Username))
return nil
}
h.logger.Info(fmt.Sprintf(publishedInfoFmt, s.Username, *topic))
return nil
}
// Subscribe - after client successfully subscribed.
func (h *handler) Subscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
return errClientNotInitialized
}
h.logger.Info(fmt.Sprintf(subscribedInfoFmt, s.Username, strings.Join(*topics, ",")))
return nil
}
// Unsubscribe - after client unsubscribed.
func (h *handler) Unsubscribe(ctx context.Context, topics *[]string) error {
return nil
}
// Disconnect - connection with broker or client lost.
func (h *handler) Disconnect(ctx context.Context) error {
return nil
}
func (h *handler) authAccess(ctx context.Context, secret, domainID, chanID string, msgType connections.ConnType, topicType messaging.TopicType) (string, error) {
authnRes, err := h.clients.Authenticate(ctx, &grpcClientsV1.AuthnReq{Token: smqauthn.AuthPack(smqauthn.DomainAuth, domainID, secret)})
if err != nil {
return "", mgate.NewCOAPProxyError(http.StatusUnauthorized, svcerr.ErrAuthentication)
}
if !authnRes.Authenticated {
return "", mgate.NewCOAPProxyError(http.StatusUnauthorized, svcerr.ErrAuthentication)
}
if topicType == messaging.HealthType {
return authnRes.GetId(), nil
}
ar := &grpcChannelsV1.AuthzReq{
Type: uint32(msgType),
ClientId: authnRes.GetId(),
ClientType: policies.ClientType,
ChannelId: chanID,
DomainId: domainID,
}
res, err := h.channels.Authorize(ctx, ar)
if err != nil {
return "", mgate.NewCOAPProxyError(http.StatusUnauthorized, errors.Wrap(svcerr.ErrAuthentication, err))
}
if !res.GetAuthorized() {
return "", mgate.NewCOAPProxyError(http.StatusUnauthorized, svcerr.ErrAuthentication)
}
return authnRes.GetId(), nil
}
+5 -2
View File
@@ -398,11 +398,14 @@ SMQ_MQTT_ADAPTER_CACHE_MAX_COST=1048576
SMQ_MQTT_ADAPTER_CACHE_BUFFER_ITEMS=64
### CoAP
## If enabled run make all inside docker/ssl directory to generate the DTLS certs
SMQ_COAP_DTLS=
SMQ_COAP_ADAPTER_LOG_LEVEL=debug
SMQ_COAP_ADAPTER_HOST=coap-adapter
SMQ_COAP_ADAPTER_PORT=5683
SMQ_COAP_ADAPTER_SERVER_CERT=
SMQ_COAP_ADAPTER_SERVER_KEY=
SMQ_COAP_ADAPTER_SERVER_CERT_FILE=${SMQ_COAP_DTLS:+./ssl/certs/coap-server.crt}
SMQ_COAP_ADAPTER_SERVER_KEY_FILE=${SMQ_COAP_DTLS:+./ssl/certs/coap-server.key}
SMQ_COAP_ADAPTER_SERVER_CA_FILE=${SMQ_COAP_DTLS:+./ssl/certs/coap-server-ca.crt}
SMQ_COAP_ADAPTER_HTTP_HOST=coap-adapter
SMQ_COAP_ADAPTER_HTTP_PORT=5683
SMQ_COAP_ADAPTER_HTTP_SERVER_CERT=
+19 -2
View File
@@ -1346,8 +1346,9 @@ services:
SMQ_COAP_ADAPTER_LOG_LEVEL: ${SMQ_COAP_ADAPTER_LOG_LEVEL}
SMQ_COAP_ADAPTER_HOST: ${SMQ_COAP_ADAPTER_HOST}
SMQ_COAP_ADAPTER_PORT: ${SMQ_COAP_ADAPTER_PORT}
SMQ_COAP_ADAPTER_SERVER_CERT: ${SMQ_COAP_ADAPTER_SERVER_CERT}
SMQ_COAP_ADAPTER_SERVER_KEY: ${SMQ_COAP_ADAPTER_SERVER_KEY}
SMQ_COAP_ADAPTER_SERVER_CERT_FILE: ${SMQ_COAP_ADAPTER_SERVER_CERT_FILE:+/coap-server.crt}
SMQ_COAP_ADAPTER_SERVER_KEY_FILE: ${SMQ_COAP_ADAPTER_SERVER_KEY_FILE:+/coap-server.key}
SMQ_COAP_ADAPTER_SERVER_CA_FILE: ${SMQ_COAP_ADAPTER_SERVER_CA_FILE:+/coap-server-ca.crt}
SMQ_COAP_ADAPTER_HTTP_HOST: ${SMQ_COAP_ADAPTER_HTTP_HOST}
SMQ_COAP_ADAPTER_HTTP_PORT: ${SMQ_COAP_ADAPTER_HTTP_PORT}
SMQ_COAP_ADAPTER_HTTP_SERVER_CERT: ${SMQ_COAP_ADAPTER_HTTP_SERVER_CERT}
@@ -1382,6 +1383,22 @@ services:
networks:
- supermq-base-net
volumes:
# DTLS certificates for CoAP
- type: bind
source: ${SMQ_COAP_ADAPTER_SERVER_CERT_FILE:-ssl/certs/dummy/server_cert}
target: /coap-server${SMQ_COAP_ADAPTER_SERVER_CERT_FILE:+.crt}
bind:
create_host_path: true
- type: bind
source: ${SMQ_COAP_ADAPTER_SERVER_KEY_FILE:-ssl/certs/dummy/server_key}
target: /coap-server${SMQ_COAP_ADAPTER_SERVER_KEY_FILE:+.key}
bind:
create_host_path: true
- type: bind
source: ${SMQ_COAP_ADAPTER_SERVER_CA_FILE:-ssl/certs/dummy/server_ca}
target: /coap-server-ca${SMQ_COAP_ADAPTER_SERVER_CA_FILE:+.crt}
bind:
create_host_path: true
# Clients gRPC mTLS client certificates
- type: bind
source: ${SMQ_CLIENTS_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
+7 -1
View File
@@ -40,6 +40,9 @@ CHANNELS_GRPC_SERVER_CN=channels
CHANNELS_GRPC_CLIENT_CN=channels-client
CHANNELS_GRPC_SERVER_CRT_FILE_NAME=channels-grpc-server
CHANNELS_GRPC_CLIENT_CRT_FILE_NAME=channels-grpc-client
COAP_DTLS_SERVER_CONF_FILE_NAME=coap-server.conf
COAP_DTLS_SERVER_CN=coap
COAP_DTLS_SERVER_CRT_FILE_NAME=coap-server
define GRPC_CERT_CONFIG
[req]
@@ -69,7 +72,7 @@ It can be downloaded from $(DOWNLOAD_URL).
etc, etc.
endef
all: clean_certs ca server_cert auth_grpc_certs domains_grpc_certs groups_grpc_certs clients_grpc_certs channels_grpc_certs
all: clean_certs ca server_cert auth_grpc_certs domains_grpc_certs groups_grpc_certs clients_grpc_certs channels_grpc_certs coap_dtls_certs
# CA name and key is "ca".
ca:
@@ -147,6 +150,9 @@ clients_grpc_certs:
channels_grpc_certs:
$(call gen_grpc_cert_pair,$(CHANNELS_GRPC_SERVER_CRT_FILE_NAME),$(CHANNELS_GRPC_SERVER_CN),$(CHANNELS_GRPC_CLIENT_CRT_FILE_NAME),$(CHANNELS_GRPC_CLIENT_CN))
coap_dtls_certs:
$(call gen_grpc_cert,$(COAP_DTLS_SERVER_CRT_FILE_NAME),$(COAP_DTLS_SERVER_CN))
clean_certs:
rm -r $(CRT_LOCATION)/*.crt
rm -r $(CRT_LOCATION)/*.key
+2 -2
View File
@@ -5,7 +5,7 @@ go 1.25.3
require (
github.com/0x6flab/namegenerator v1.4.0
github.com/absmach/callhome v0.18.2-0.20251020154744-4dd66476749a
github.com/absmach/mgate v0.4.6-0.20250605150648-edf967fbb46a
github.com/absmach/mgate v0.4.6-0.20251015080752-c33495ed8c0d
github.com/absmach/senml v1.0.8
github.com/authzed/authzed-go v1.6.0
github.com/authzed/grpcutil v0.0.0-20250221190651-1985b19b35b8
@@ -30,6 +30,7 @@ require (
github.com/oklog/ulid/v2 v2.1.1
github.com/ory/dockertest/v3 v3.12.0
github.com/pelletier/go-toml v1.9.5
github.com/pion/dtls/v3 v3.0.7
github.com/plgd-dev/go-coap/v3 v3.4.0
github.com/prometheus/client_golang v1.23.2
github.com/rabbitmq/amqp091-go v1.10.0
@@ -122,7 +123,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.2.3 // indirect
github.com/pion/dtls/v3 v3.0.7 // indirect
github.com/pion/logging v0.2.4 // indirect
github.com/pion/transport/v3 v3.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
+2 -2
View File
@@ -21,8 +21,8 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/absmach/callhome v0.18.2-0.20251020154744-4dd66476749a h1:rT6fkATl2TZwyk77knRZMbjuKQPP2aT+Xyyud2pVlpc=
github.com/absmach/callhome v0.18.2-0.20251020154744-4dd66476749a/go.mod h1:LEXKhES9JJtj3tBgTZv7VPNjOi5ukJQB0mFic0QP60Q=
github.com/absmach/mgate v0.4.6-0.20250605150648-edf967fbb46a h1:1+772OQFHAS23JLAHrCZxO+DnGoiMllKcSwLQy74y+k=
github.com/absmach/mgate v0.4.6-0.20250605150648-edf967fbb46a/go.mod h1:X2amjQg/2cnM+UKblMdpU2M4cZO74xtEHNIxtuUXCeA=
github.com/absmach/mgate v0.4.6-0.20251015080752-c33495ed8c0d h1:uEoSX6kR/SjXYTrw6b33fd56Z0fBVi27BVirXIhNxwg=
github.com/absmach/mgate v0.4.6-0.20251015080752-c33495ed8c0d/go.mod h1:0KVq7mxM0wayosmyXPPxp1EL0c2d9kRp5V8NZCKdetA=
github.com/absmach/senml v1.0.8 h1:+opem/r4g6c6eA/JLyCIuksyEhj7eBdysY3pEmy1mqo=
github.com/absmach/senml v1.0.8/go.mod h1:DRhzHLgvQoIUHroBgpFrSWso+bJZO9E96RlHAHy+VRI=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=