certficate integration with ATOM

Signed-off-by: Arvindh <arvindh91@gmail.com>
This commit is contained in:
Arvindh
2026-06-08 14:45:38 +05:30
parent 78bcf0a1d5
commit f4f5dd4890
9 changed files with 263 additions and 155 deletions
+39 -1
View File
@@ -75,6 +75,19 @@ func (fp fanoutPublisher) Close() error {
return errors.Join(errs...)
}
type writerBridgeHandler struct {
ctx context.Context
publisher messaging.Publisher
}
func (h writerBridgeHandler) Handle(msg *messaging.Message) error {
return h.publisher.Publish(h.ctx, messaging.EncodeMessageTopic(msg), msg)
}
func (h writerBridgeHandler) Cancel() error {
return nil
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
@@ -120,7 +133,7 @@ func main() {
}
atomAuthz := atom.NewClient(atomCfg)
authn := atomauthn.NewAuthentication()
clientsClient := atom.NewClientsCompat(authn)
clientsClient := atom.NewClientsCompat(authn, atomAuthz)
domainsClient := atom.NewDomainsCompat(atomAuthz)
channelsClient := atom.NewChannelsCompat(atomAuthz)
logger.Info("FluxMQ authentication, authorization, and route resolution configured to use Atom")
@@ -201,6 +214,31 @@ func main() {
defer writerPublisher.Close()
publisher := fanoutPublisher{publishers: []messaging.Publisher{messagePublisher, writerPublisher}}
writerBridge, err := fluxmqbroker.NewPubSub(
ctx,
cfg.BrokerURL,
logger,
fluxmqbroker.DirectTopicOnly(),
fluxmqbroker.ConnectionName("fluxmq-mqtt-writer-bridge"),
)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT writer bridge subscriber: %s", err))
exitCode = 1
return
}
defer writerBridge.Close()
if err := writerBridge.Subscribe(ctx, messaging.SubscriberConfig{
ID: cfg.InstanceID + "-mqtt-writer-bridge",
Topic: "m/#",
Handler: writerBridgeHandler{ctx: ctx, publisher: writerPublisher},
DeliveryPolicy: messaging.DeliverNewPolicy,
}); err != nil {
logger.Error(fmt.Sprintf("failed to subscribe MQTT writer bridge: %s", err))
exitCode = 1
return
}
logger.Info("FluxMQ MQTT writer bridge subscribed", "topic", "m/#")
httpServerConfig := server.Config{Port: "9026"}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load publish proxy HTTP server configuration: %s", err))
+10 -3
View File
@@ -147,7 +147,6 @@ ATOM_JWKS_URL=http://atom:8080/.well-known/jwks.json
ATOM_JWT_ISSUER=http://nginx:80
ATOM_JWT_AUDIENCE=magistrala
ATOM_CORS_ALLOWED_ORIGINS=http://localhost:3000,http://localhost
ATOM_UI_RELEASE_TAG=latest
ATOM_UI_HTTP_PORT=3005
ATOM_SERVICE_TOKEN=
ATOM_SERVICE_USERNAME=mg-service
@@ -156,6 +155,7 @@ ATOM_ADMIN_TOKEN=
ATOM_ADMIN_USERNAME=admin
ATOM_TIMEOUT=5s
ATOM_HTTP_PORT=8080
ATOM_GRPC_ADDR=0.0.0.0:8081
ATOM_DB_PORT=6010
ATOM_DB_USER=atom
ATOM_DB_PASSWORD=atom
@@ -164,6 +164,13 @@ ATOM_JWT_SECRET=change-me-in-production
ATOM_JWT_EXPIRY_SECS=3600
ATOM_ADMIN_SECRET=12345678
ATOM_MIN_PASSWORD_CHARS=8
ATOM_CERTS_ENABLED=true
ATOM_CERTS_CA_MODE=file_root_issuer
ATOM_CERTS_ROOT_CA_CERT_PATH=/certs/ca.crt
ATOM_CERTS_ROOT_CA_KEY_PATH=/certs/ca.key
ATOM_CERTS_CA_DIR=./ssl/certs
ATOM_CERTS_LEAF_DEFAULT_TTL_SECS=2592000
ATOM_CERTS_LEAF_MAX_TTL_SECS=2592000
ATOM_RUST_LOG=info
ATOM_INVITATION_REDIRECT=http://localhost:3000/invitations/accept
ATOM_INVITATION_EXPIRY_SECS=604800
@@ -403,8 +410,8 @@ MG_CERTS_DB_MAX_CONNECTIONS=100
## OpenBao Configuration for Certs
MG_CERTS_OPENBAO_HOST=http://openbao:8200
MG_CERTS_OPENBAO_APP_ROLE=absmach
MG_CERTS_OPENBAO_APP_SECRET=absmach
MG_CERTS_OPENBAO_APP_ROLE=certs-dev-role
MG_CERTS_OPENBAO_APP_SECRET=certs-dev-secret
MG_CERTS_OPENBAO_NAMESPACE=
MG_CERTS_OPENBAO_PKI_PATH=pki
MG_CERTS_OPENBAO_ROLE=absmach
+10 -123
View File
@@ -26,8 +26,6 @@ volumes:
magistrala-re-db-volume:
magistrala-alarms-db-volume:
magistrala-reports-db-volume:
magistrala-certs-db-volume:
magistrala-openbao-data:
magistrala-timescale-writer-volume:
magistrala-fluxmq-node1-volume:
magistrala-fluxmq-node2-volume:
@@ -65,6 +63,7 @@ services:
environment:
DATABASE_URL: postgres://${ATOM_DB_USER}:${ATOM_DB_PASSWORD}@atom-db:5432/${ATOM_DB_NAME}
LISTEN_ADDR: 0.0.0.0:8080
GRPC_ADDR: ${ATOM_GRPC_ADDR:-0.0.0.0:8081}
JWT_SECRET: ${ATOM_JWT_SECRET}
JWT_EXPIRY_SECS: ${ATOM_JWT_EXPIRY_SECS}
ATOM_PUBLIC_BASE_URL: ${ATOM_PUBLIC_URL}
@@ -81,18 +80,25 @@ services:
ATOM_SMTP_TLS: ${ATOM_SMTP_TLS}
ADMIN_SECRET: ${ATOM_ADMIN_SECRET}
ATOM_SERVICE_SECRET: ${ATOM_SERVICE_SECRET}
ATOM_CERTS_ENABLED: ${ATOM_CERTS_ENABLED:-true}
ATOM_CERTS_CA_MODE: ${ATOM_CERTS_CA_MODE:-file_root_issuer}
ATOM_CERTS_ROOT_CA_CERT_PATH: ${ATOM_CERTS_ROOT_CA_CERT_PATH:-/certs/ca.crt}
ATOM_CERTS_ROOT_CA_KEY_PATH: ${ATOM_CERTS_ROOT_CA_KEY_PATH:-/certs/ca.key}
ATOM_CERTS_LEAF_DEFAULT_TTL_SECS: ${ATOM_CERTS_LEAF_DEFAULT_TTL_SECS:-2592000}
ATOM_CERTS_LEAF_MAX_TTL_SECS: ${ATOM_CERTS_LEAF_MAX_TTL_SECS:-2592000}
ATOM_MIN_PASSWORD_CHARS: ${ATOM_MIN_PASSWORD_CHARS}
RUST_LOG: ${ATOM_RUST_LOG}
ports:
- ${ATOM_HTTP_PORT}:8080
volumes:
- ${ATOM_CERTS_CA_DIR:-./ssl/certs}:/certs:ro
networks:
- magistrala-base-net
atom-ui:
image: ghcr.io/absmach/atom-ui:${ATOM_UI_RELEASE_TAG:-latest}
image: ghcr.io/absmach/atom-ui:latest
container_name: magistrala-atom-ui
restart: on-failure
profiles: ["atom-ui"]
depends_on:
- atom
environment:
@@ -1922,122 +1928,3 @@ services:
- "4000:3000"
networks:
- magistrala-base-net
certs:
image: ghcr.io/absmach/magistrala/certs:${MG_RELEASE_TAG}
container_name: magistrala-certs
depends_on:
openbao:
condition: service_healthy
certs-db:
condition: service_started
restart: on-failure
networks:
- magistrala-base-net
environment:
MG_CERTS_LOG_LEVEL: ${MG_CERTS_LOG_LEVEL}
MG_CERTS_HTTP_HOST: ${MG_CERTS_HTTP_HOST}
MG_CERTS_HTTP_PORT: ${MG_CERTS_HTTP_PORT}
MG_CERTS_GRPC_HOST: ${MG_CERTS_GRPC_HOST}
MG_CERTS_GRPC_PORT: ${MG_CERTS_GRPC_PORT}
MG_JAEGER_URL: ${MG_JAEGER_URL}
MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO}
MG_CERTS_OPENBAO_HOST: ${MG_CERTS_OPENBAO_HOST}
MG_CERTS_OPENBAO_APP_ROLE: ${MG_CERTS_OPENBAO_APP_ROLE}
MG_CERTS_OPENBAO_APP_SECRET: ${MG_CERTS_OPENBAO_APP_SECRET}
MG_CERTS_OPENBAO_NAMESPACE: ${MG_CERTS_OPENBAO_NAMESPACE}
MG_CERTS_OPENBAO_PKI_PATH: ${MG_CERTS_OPENBAO_PKI_PATH}
MG_CERTS_OPENBAO_ROLE: ${MG_CERTS_OPENBAO_ROLE}
MG_CERTS_OPENBAO_SECRET_ID_TTL: ${MG_CERTS_OPENBAO_SECRET_ID_TTL}
MG_CERTS_DB_HOST: ${MG_CERTS_DB_HOST}
MG_CERTS_DB_PORT: ${MG_CERTS_DB_PORT}
MG_CERTS_DB_USER: ${MG_CERTS_DB_USER}
MG_CERTS_DB_PASS: ${MG_CERTS_DB_PASS}
MG_CERTS_DB: ${MG_CERTS_DB}
MG_CERTS_DB_SSL_MODE: ${MG_CERTS_DB_SSL_MODE}
ATOM_URL: ${ATOM_URL}
ATOM_SERVICE_TOKEN: ${ATOM_SERVICE_TOKEN}
ATOM_SERVICE_USERNAME: ${ATOM_SERVICE_USERNAME}
ATOM_SERVICE_SECRET: ${ATOM_SERVICE_SECRET}
ATOM_JWKS_URL: ${ATOM_JWKS_URL}
ATOM_JWT_ISSUER: ${ATOM_JWT_ISSUER}
ATOM_JWT_AUDIENCE: ${ATOM_JWT_AUDIENCE}
ATOM_ADMIN_TOKEN: ${ATOM_ADMIN_TOKEN}
ATOM_ADMIN_USERNAME: ${ATOM_ADMIN_USERNAME}
ATOM_ADMIN_SECRET: ${ATOM_ADMIN_SECRET}
ATOM_TIMEOUT: ${ATOM_TIMEOUT}
MG_CERTS_SECRET: ${MG_CERTS_SECRET}
MG_CERTS_SERVICE_TOKEN_PATH: ${MG_CERTS_SERVICE_TOKEN_PATH}
MG_CERTS_SECRET_ID_PATH: ${MG_CERTS_SECRET_ID_PATH}
MG_CERTS_SECRET_RENEW_THRESHOLD: ${MG_CERTS_SECRET_RENEW_THRESHOLD}
MG_CERTS_SECRET_CHECK_INTERVAL: ${MG_CERTS_SECRET_CHECK_INTERVAL}
MG_ALLOW_UNVERIFIED_USER: ${MG_ALLOW_UNVERIFIED_USER}
ports:
- ${MG_CERTS_HTTP_PORT}:${MG_CERTS_HTTP_PORT}
- ${MG_CERTS_GRPC_PORT}:${MG_CERTS_GRPC_PORT}
volumes:
- magistrala-openbao-data:/openbao:ro
certs-db:
image: docker.io/postgres:16.2-alpine
container_name: magistrala-certs-db
restart: on-failure
networks:
- magistrala-base-net
command: postgres -c "max_connections=${MG_CERTS_DB_MAX_CONNECTIONS}"
environment:
POSTGRES_USER: ${MG_CERTS_DB_USER}
POSTGRES_PASSWORD: ${MG_CERTS_DB_PASS}
POSTGRES_DB: ${MG_CERTS_DB}
ports:
- 5454:5432
volumes:
- magistrala-certs-db-volume:/var/lib/postgresql/data
openbao:
image: openbao/openbao:2.4.0
container_name: magistrala-openbao
restart: on-failure
networks:
- magistrala-base-net
ports:
- 8200:8200
healthcheck:
test: ["CMD", "sh", "-c", "test -f /opt/openbao/data/service_token"]
interval: 5s
timeout: 3s
retries: 20
start_period: 30s
environment:
- BAO_ADDR=http://127.0.0.1:8200
- BAO_LOG_LEVEL=info
- MG_CERTS_OPENBAO_PKI_ROLE=${MG_CERTS_OPENBAO_ROLE}
- MG_CERTS_OPENBAO_APP_ROLE=${MG_CERTS_OPENBAO_APP_ROLE}
- MG_CERTS_OPENBAO_APP_SECRET=${MG_CERTS_OPENBAO_APP_SECRET}
- MG_CERTS_OPENBAO_SECRET_ID_TTL=${MG_CERTS_OPENBAO_SECRET_ID_TTL}
- MG_CERTS_OPENBAO_NAMESPACE=${MG_CERTS_OPENBAO_NAMESPACE}
- MG_CERTS_OPENBAO_PKI_CA_CN=${MG_CERTS_OPENBAO_PKI_CA_CN}
- MG_CERTS_OPENBAO_PKI_CA_OU=${MG_CERTS_OPENBAO_PKI_CA_OU}
- MG_CERTS_OPENBAO_PKI_CA_O=${MG_CERTS_OPENBAO_PKI_CA_O}
- MG_CERTS_OPENBAO_PKI_CA_C=${MG_CERTS_OPENBAO_PKI_CA_C}
- MG_CERTS_OPENBAO_PKI_CA_L=${MG_CERTS_OPENBAO_PKI_CA_L}
- MG_CERTS_OPENBAO_PKI_CA_ST=${MG_CERTS_OPENBAO_PKI_CA_ST}
- MG_CERTS_OPENBAO_PKI_CA_ADDR=${MG_CERTS_OPENBAO_PKI_CA_ADDR}
- MG_CERTS_OPENBAO_PKI_CA_PO=${MG_CERTS_OPENBAO_PKI_CA_PO}
- MG_CERTS_OPENBAO_PKI_CA_DNS_NAMES=${MG_CERTS_OPENBAO_PKI_CA_DNS_NAMES}
- MG_CERTS_OPENBAO_PKI_CA_IP_ADDRESSES=${MG_CERTS_OPENBAO_PKI_CA_IP_ADDRESSES}
- MG_CERTS_OPENBAO_PKI_CA_URI_SANS=${MG_CERTS_OPENBAO_PKI_CA_URI_SANS}
- MG_CERTS_OPENBAO_PKI_CA_EMAIL_ADDRESSES=${MG_CERTS_OPENBAO_PKI_CA_EMAIL_ADDRESSES}
- MG_CERTS_OPENBAO_UNSEAL_KEY_1=${MG_CERTS_OPENBAO_UNSEAL_KEY_1}
- MG_CERTS_OPENBAO_UNSEAL_KEY_2=${MG_CERTS_OPENBAO_UNSEAL_KEY_2}
- MG_CERTS_OPENBAO_UNSEAL_KEY_3=${MG_CERTS_OPENBAO_UNSEAL_KEY_3}
- MG_CERTS_OPENBAO_ROOT_TOKEN=${MG_CERTS_OPENBAO_ROOT_TOKEN}
cap_add:
- IPC_LOCK
mem_swappiness: 0
volumes:
- magistrala-openbao-data:/opt/openbao/data
- magistrala-openbao-data:/opt/openbao/config
- ./openbao-entrypoint.sh:/entrypoint.sh
entrypoint: /bin/sh
command: /entrypoint.sh
+5 -1
View File
@@ -104,9 +104,13 @@ func (s *connectServer) Authorize(ctx context.Context, req *connect.Request[auth
}
if s.atomAuth != nil {
action := "subscribe"
if connType == connections.Publish {
action = "publish"
}
res, err := s.atomAuth.CheckAuthz(ctx, atom.AuthzRequest{
SubjectID: req.Msg.GetExternalId(),
Action: "connect",
Action: action,
ResourceID: channelID,
ObjectKind: "resource",
ObjectID: channelID,
+30 -5
View File
@@ -5,6 +5,7 @@ package atom
import (
"context"
"net/http"
"strings"
channelsv1 "github.com/absmach/magistrala/api/grpc/channels/v1"
@@ -19,17 +20,36 @@ import (
)
type AtomClientsCompat struct {
Authn smqauthn.Authentication
Authn smqauthn.Authentication
Client *Client
}
func NewClientsCompat(authn smqauthn.Authentication) clientsv1.ClientsServiceClient {
return AtomClientsCompat{Authn: authn}
func NewClientsCompat(authn smqauthn.Authentication, client ...*Client) clientsv1.ClientsServiceClient {
atomClient := NewClient(LoadConfig())
if len(client) > 0 && client[0] != nil {
atomClient = client[0]
}
return AtomClientsCompat{Authn: authn, Client: atomClient}
}
func (c AtomClientsCompat) Authenticate(ctx context.Context, in *clientsv1.AuthnReq, _ ...grpc.CallOption) (*clientsv1.AuthnRes, error) {
token := in.GetToken()
if _, _, key, err := smqauthn.AuthUnpack(token); err == nil {
token = key
if prefix, id, key, err := smqauthn.AuthUnpack(token); err == nil {
switch prefix {
case smqauthn.BasicAuth:
res, loginErr := c.Client.LoginPassword(ctx, id, key)
if loginErr == nil {
return &clientsv1.AuthnRes{Authenticated: true, Id: res.EntityID}, nil
}
if !isAtomUnauthorized(loginErr) {
return nil, loginErr
}
token = key
case smqauthn.DomainAuth:
token = key
case smqauthn.Unknown:
token = key
}
}
session, err := c.Authn.Authenticate(ctx, token)
if err != nil {
@@ -38,6 +58,11 @@ func (c AtomClientsCompat) Authenticate(ctx context.Context, in *clientsv1.Authn
return &clientsv1.AuthnRes{Authenticated: true, Id: session.UserID}, nil
}
func isAtomUnauthorized(err error) bool {
atomErr, ok := err.(Error)
return ok && atomErr.StatusCode == http.StatusUnauthorized
}
func (c AtomClientsCompat) RetrieveEntity(context.Context, *commonv1.RetrieveEntityReq, ...grpc.CallOption) (*commonv1.RetrieveEntityRes, error) {
return nil, status.Error(codes.Unimplemented, "atom clients compatibility only supports Authenticate")
}
+107
View File
@@ -0,0 +1,107 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package atom
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
clientsv1 "github.com/absmach/magistrala/api/grpc/clients/v1"
smqauthn "github.com/absmach/magistrala/pkg/authn"
)
type recordingAuthn struct {
called bool
token string
session smqauthn.Session
err error
}
func (r *recordingAuthn) Authenticate(_ context.Context, token string) (smqauthn.Session, error) {
r.called = true
r.token = token
return r.session, r.err
}
func TestAtomClientsCompatAuthenticatesBasicPasswordWithAtomLogin(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost || r.URL.Path != "/auth/login" {
t.Fatalf("unexpected request: %s %s", r.Method, r.URL.Path)
}
var got LoginRequest
if err := json.NewDecoder(r.Body).Decode(&got); err != nil {
t.Fatalf("decode login request: %v", err)
}
if got.Identifier != "entity-1" || got.Secret != "device-secret" || got.Kind != "password" {
t.Fatalf("unexpected login request: %+v", got)
}
_ = json.NewEncoder(w).Encode(LoginResponse{
Token: "jwt",
EntityID: "entity-1",
SessionID: "session-1",
ExpiresAt: time.Now().Add(time.Hour),
})
}))
defer srv.Close()
fallback := &recordingAuthn{}
compat := NewClientsCompat(fallback, NewClient(Config{URL: srv.URL, Timeout: time.Second}))
token := smqauthn.AuthPack(smqauthn.BasicAuth, "entity-1", "device-secret")
res, err := compat.Authenticate(context.Background(), &clientsv1.AuthnReq{Token: token})
if err != nil {
t.Fatalf("authenticate basic password: %v", err)
}
if !res.GetAuthenticated() || res.GetId() != "entity-1" {
t.Fatalf("unexpected response: %+v", res)
}
if fallback.called {
t.Fatal("token fallback should not be called after successful Atom password login")
}
}
func TestAtomClientsCompatFallsBackToBearerTokenWhenBasicPasswordRejected(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "invalid credentials", http.StatusUnauthorized)
}))
defer srv.Close()
fallback := &recordingAuthn{session: smqauthn.Session{UserID: "entity-2"}}
compat := NewClientsCompat(fallback, NewClient(Config{URL: srv.URL, Timeout: time.Second}))
token := smqauthn.AuthPack(smqauthn.BasicAuth, "entity-1", "atom_token")
res, err := compat.Authenticate(context.Background(), &clientsv1.AuthnReq{Token: token})
if err != nil {
t.Fatalf("authenticate fallback token: %v", err)
}
if !fallback.called || fallback.token != "atom_token" {
t.Fatalf("unexpected fallback call: called=%v token=%q", fallback.called, fallback.token)
}
if !res.GetAuthenticated() || res.GetId() != "entity-2" {
t.Fatalf("unexpected response: %+v", res)
}
}
func TestAtomClientsCompatDoesNotHideAtomPasswordLoginFailures(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "atom unavailable", http.StatusInternalServerError)
}))
defer srv.Close()
fallback := &recordingAuthn{session: smqauthn.Session{UserID: "entity-2"}}
compat := NewClientsCompat(fallback, NewClient(Config{URL: srv.URL, Timeout: time.Second}))
token := smqauthn.AuthPack(smqauthn.BasicAuth, "entity-1", "device-secret")
_, err := compat.Authenticate(context.Background(), &clientsv1.AuthnReq{Token: token})
if err == nil {
t.Fatal("expected Atom login failure")
}
if fallback.called {
t.Fatal("token fallback should not be called for non-authentication Atom failures")
}
}
+20
View File
@@ -20,6 +20,7 @@ type options struct {
prefix string
connectionName string
directTopicIngress bool
directTopicOnly bool
}
func defaultOptions() options {
@@ -85,6 +86,25 @@ func DirectTopicIngress() messaging.Option {
}
}
// DirectTopicOnly subscribes only to regular MQTT topics and skips stream queue
// consumption. This is intended for bridge services that observe broker-native
// topics without also consuming queued messages.
func DirectTopicOnly() messaging.Option {
return func(val any) error {
switch v := val.(type) {
case *publisher:
return nil
case *pubsub:
v.directTopicIngress = true
v.directTopicOnly = true
default:
return ErrInvalidType
}
return nil
}
}
// JSStreamConfig is a no-op for FluxMQ AMQP backend and exists only to keep
// option-compatibility with legacy NATS broker wrappers.
func JSStreamConfig(_ jetstream.StreamConfig) messaging.Option {
+29 -22
View File
@@ -93,29 +93,31 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e
}
group := formatConsumerName(cfg.Topic, cfg.ID)
opts := &fluxamqp.StreamConsumeOptions{
QueueName: ps.prefix,
Filter: streamFilter(ps.prefix, cfg.Topic),
ConsumerGroup: group,
}
sub := subscription{}
switch cfg.DeliveryPolicy {
case messaging.DeliverNewPolicy:
opts.Offset = "last"
case messaging.DeliverAllPolicy:
opts.Offset = "first"
}
if err := ps.client.SubscribeToStream(opts, func(msg *fluxamqp.QueueMessage) {
if err := ps.handle(cfg.Handler, msg); err != nil {
ps.logWarn("failed to process FluxMQ stream message", "error", err, "topic", cfg.Topic, "consumer_group", group)
if !ps.directTopicOnly {
opts := &fluxamqp.StreamConsumeOptions{
QueueName: ps.prefix,
Filter: streamFilter(ps.prefix, cfg.Topic),
ConsumerGroup: group,
}
}); err != nil {
return err
}
sub := subscription{
streamTopic: queueFilter(ps.prefix, cfg.Topic),
switch cfg.DeliveryPolicy {
case messaging.DeliverNewPolicy:
opts.Offset = "last"
case messaging.DeliverAllPolicy:
opts.Offset = "first"
}
if err := ps.client.SubscribeToStream(opts, func(msg *fluxamqp.QueueMessage) {
if err := ps.handle(cfg.Handler, msg); err != nil {
ps.logWarn("failed to process FluxMQ stream message", "error", err, "topic", cfg.Topic, "consumer_group", group)
}
}); err != nil {
return err
}
sub.streamTopic = queueFilter(ps.prefix, cfg.Topic)
}
if ps.directTopicIngress {
// Subscribe to regular MQTT topics so that messages published directly
@@ -126,7 +128,9 @@ func (ps *pubsub) Subscribe(_ context.Context, cfg messaging.SubscriberConfig) e
ps.logWarn("failed to process FluxMQ topic message", "error", err, "topic", sub.mqttTopic)
}
}); err != nil {
_ = ps.client.UnsubscribeFromStream(sub.streamTopic)
if sub.streamTopic != "" {
_ = ps.client.UnsubscribeFromStream(sub.streamTopic)
}
return err
}
@@ -155,7 +159,10 @@ func (ps *pubsub) Unsubscribe(_ context.Context, id, topic string) error {
return ErrNotSubscribed
}
streamErr := ps.client.UnsubscribeFromStream(sub.streamTopic)
var streamErr error
if sub.streamTopic != "" {
streamErr = ps.client.UnsubscribeFromStream(sub.streamTopic)
}
var topicErr error
if sub.mqttTopic != "" {
topicErr = ps.client.Unsubscribe(sub.mqttTopic)
+13
View File
@@ -226,3 +226,16 @@ func TestMessageFromDeliveryZeroTimestampFallsBackToNow(t *testing.T) {
t.Fatalf("expected created timestamp between %d and %d, got %d", before, after, got.Created)
}
}
func TestDirectTopicOnlyEnablesDirectIngressAndSkipsStream(t *testing.T) {
ps := &pubsub{}
if err := DirectTopicOnly()(ps); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !ps.directTopicIngress {
t.Fatal("expected direct topic ingress to be enabled")
}
if !ps.directTopicOnly {
t.Fatal("expected stream consumption to be skipped")
}
}