From ddce4f4a46209d7b4a362b66df5b336c475575d8 Mon Sep 17 00:00:00 2001 From: b1ackd0t <28790446+rodneyosodo@users.noreply.github.com> Date: Wed, 16 Apr 2025 13:36:12 +0300 Subject: [PATCH] NOISSUE - Fix Duplicate Streams (#117) * fix(re): duplicate streams Fixes nats: API error: code=400 err_code=10058 description=stream name already in use Align the codebase to use one config for alarms for both alarms service and rules engine Move writers config to utilize build tags * fix(brokers): change package names for brokers move re brokers package to consumers For RE broker, provide a publisher interface Move alarms broker from consumer package Signed-off-by: Rodney Osodo * fix(alarms): renaming rePubSub to msgSub --------- Signed-off-by: Rodney Osodo --- alarms/brokers/brokers_nats.go | 49 ++++++ alarms/brokers/brokers_rabbitmq.go | 35 +++++ alarms/consumer/brokers/brokers_nats.go | 39 ----- alarms/consumer/brokers/brokers_rabbitmq.go | 26 ---- cmd/alarms/main.go | 2 +- cmd/postgres-writer/main.go | 2 +- cmd/re/main.go | 140 +++++++----------- cmd/timescale-writer/main.go | 2 +- consumers/writers/brokers/brokers_nats.go | 49 ++++++ consumers/writers/brokers/brokers_rabbitmq.go | 35 +++++ 10 files changed, 222 insertions(+), 157 deletions(-) create mode 100644 alarms/brokers/brokers_nats.go create mode 100644 alarms/brokers/brokers_rabbitmq.go delete mode 100644 alarms/consumer/brokers/brokers_nats.go delete mode 100644 alarms/consumer/brokers/brokers_rabbitmq.go create mode 100644 consumers/writers/brokers/brokers_nats.go create mode 100644 consumers/writers/brokers/brokers_rabbitmq.go diff --git a/alarms/brokers/brokers_nats.go b/alarms/brokers/brokers_nats.go new file mode 100644 index 000000000..08affb83b --- /dev/null +++ b/alarms/brokers/brokers_nats.go @@ -0,0 +1,49 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +//go:build !rabbitmq +// +build !rabbitmq + +package brokers + +import ( + "context" + "log/slog" + "time" + + "github.com/absmach/supermq/pkg/messaging" + broker "github.com/absmach/supermq/pkg/messaging/nats" + "github.com/nats-io/nats.go/jetstream" +) + +const AllTopic = "alarms.>" + +var cfg = jetstream.StreamConfig{ + Name: "alarms", + Description: "SuperMQ stream alarms", + Subjects: []string{"alarms.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e6, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, +} + +func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) { + pb, err := broker.NewPubSub(ctx, url, logger, broker.JSStreamConfig(cfg)) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) { + pb, err := broker.NewPublisher(ctx, url, broker.JSStreamConfig(cfg)) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/alarms/brokers/brokers_rabbitmq.go b/alarms/brokers/brokers_rabbitmq.go new file mode 100644 index 000000000..abdefed53 --- /dev/null +++ b/alarms/brokers/brokers_rabbitmq.go @@ -0,0 +1,35 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +//go:build rabbitmq +// +build rabbitmq + +package brokers + +import ( + "context" + "log/slog" + + "github.com/absmach/supermq/pkg/messaging" + broker "github.com/absmach/supermq/pkg/messaging/rabbitmq" +) + +const AllTopic = "alarms.#" + +func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) { + pb, err := broker.NewPubSub(url, logger, broker.Prefix("writers")) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) { + pb, err := broker.NewPublisher(url, broker.Prefix("writers")) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/alarms/consumer/brokers/brokers_nats.go b/alarms/consumer/brokers/brokers_nats.go deleted file mode 100644 index 0f24a26dc..000000000 --- a/alarms/consumer/brokers/brokers_nats.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -//go:build !rabbitmq -// +build !rabbitmq - -package brokers - -import ( - "context" - "log/slog" - "time" - - "github.com/absmach/supermq/pkg/messaging" - broker "github.com/absmach/supermq/pkg/messaging/nats" - "github.com/nats-io/nats.go/jetstream" -) - -const AllTopic = "alarms.>" - -func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) { - cfg := jetstream.StreamConfig{ - Name: "alarms", - Description: "SuperMQ stream alarms", - Subjects: []string{"alarms.>"}, - Retention: jetstream.LimitsPolicy, - MaxMsgsPerSubject: 1e6, - MaxAge: time.Hour * 24, - MaxMsgSize: 1024 * 1024, - Discard: jetstream.DiscardOld, - Storage: jetstream.FileStorage, - } - pb, err := broker.NewPubSub(ctx, url, logger, broker.JSStreamConfig(cfg)) - if err != nil { - return nil, err - } - - return pb, nil -} diff --git a/alarms/consumer/brokers/brokers_rabbitmq.go b/alarms/consumer/brokers/brokers_rabbitmq.go deleted file mode 100644 index cb0960223..000000000 --- a/alarms/consumer/brokers/brokers_rabbitmq.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (c) Abstract Machines -// SPDX-License-Identifier: Apache-2.0 - -//go:build rabbitmq -// +build rabbitmq - -package brokers - -import ( - "context" - "log/slog" - - "github.com/absmach/supermq/pkg/messaging" - broker "github.com/absmach/supermq/pkg/messaging/rabbitmq" -) - -const AllTopic = "alarms.#" - -func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) { - pb, err := broker.NewPubSub(ctx, url, logger, broker.Prefix("alarms")) - if err != nil { - return nil, err - } - - return pb, nil -} diff --git a/cmd/alarms/main.go b/cmd/alarms/main.go index 23b0d91b0..7dfc2dad3 100644 --- a/cmd/alarms/main.go +++ b/cmd/alarms/main.go @@ -12,8 +12,8 @@ import ( "github.com/absmach/magistrala/alarms" httpAPI "github.com/absmach/magistrala/alarms/api" + "github.com/absmach/magistrala/alarms/brokers" "github.com/absmach/magistrala/alarms/consumer" - "github.com/absmach/magistrala/alarms/consumer/brokers" "github.com/absmach/magistrala/alarms/middleware" alarmsRepo "github.com/absmach/magistrala/alarms/postgres" "github.com/absmach/magistrala/pkg/prometheus" diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 98857d152..f5862b6cd 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -15,12 +15,12 @@ import ( chclient "github.com/absmach/callhome/pkg/client" consumertracing "github.com/absmach/magistrala/consumers/tracing" httpapi "github.com/absmach/magistrala/consumers/writers/api" + "github.com/absmach/magistrala/consumers/writers/brokers" writerpg "github.com/absmach/magistrala/consumers/writers/postgres" "github.com/absmach/supermq" "github.com/absmach/supermq/consumers" smqlog "github.com/absmach/supermq/logger" jaegerclient "github.com/absmach/supermq/pkg/jaeger" - "github.com/absmach/supermq/pkg/messaging/brokers" brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" pgclient "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/prometheus" diff --git a/cmd/re/main.go b/cmd/re/main.go index 68c1af1fe..af0d3c457 100644 --- a/cmd/re/main.go +++ b/cmd/re/main.go @@ -14,6 +14,8 @@ import ( "time" chclient "github.com/absmach/callhome/pkg/client" + abrokers "github.com/absmach/magistrala/alarms/brokers" + "github.com/absmach/magistrala/consumers/writers/brokers" "github.com/absmach/magistrala/internal/email" "github.com/absmach/magistrala/re" httpapi "github.com/absmach/magistrala/re/api" @@ -23,28 +25,22 @@ import ( "github.com/absmach/supermq" smqlog "github.com/absmach/supermq/logger" authnsvc "github.com/absmach/supermq/pkg/authn/authsvc" - mgauthz "github.com/absmach/supermq/pkg/authz" - authzsvc "github.com/absmach/supermq/pkg/authz/authsvc" "github.com/absmach/supermq/pkg/grpcclient" jaegerclient "github.com/absmach/supermq/pkg/jaeger" "github.com/absmach/supermq/pkg/messaging" + smqbrokers "github.com/absmach/supermq/pkg/messaging/brokers" brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" - "github.com/absmach/supermq/pkg/messaging/nats" pgclient "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/server" httpserver "github.com/absmach/supermq/pkg/server/http" "github.com/absmach/supermq/pkg/uuid" "github.com/caarlos0/env/v11" "github.com/go-chi/chi/v5" - "github.com/nats-io/nats.go/jetstream" - "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" ) const ( - svcName = "rules_engine" - channelsTopic = "channels.>" - + svcName = "rules_engine" envPrefixDB = "MG_RE_DB_" envPrefixHTTP = "MG_RE_HTTP_" envPrefixAuth = "SMQ_AUTH_GRPC_" @@ -53,45 +49,18 @@ const ( ) type config struct { - LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"` - InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""` - JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` - SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` - ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` - CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"` - CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"` - TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` - BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` -} - -const ( - writersCfgName = "writers" - alarmsCfgName = "alarms" - - alarmsPrefix = "alarms" - writersPrefix = "writers" -) - -var ( - writersSubjects = []string{"writers.>"} - alarmsSubjects = []string{"alarms.>"} -) - -var jsStreamConfig = jetstream.StreamConfig{ - Retention: jetstream.LimitsPolicy, - Description: "SuperMQ Rules Engine stream for handling internal messages", - MaxMsgsPerSubject: 1e6, - MaxAge: time.Hour * 24, - MaxMsgSize: 1024 * 1024, - Discard: jetstream.DiscardOld, - Storage: jetstream.FileStorage, + LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"` + InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""` + JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` + SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` + TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` + BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"` } func main() { ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) - // Create new rule engine configuration cfg := config{} if err := env.Parse(&cfg); err != nil { log.Fatalf("failed to load %s configuration : %s", svcName, err) @@ -110,6 +79,7 @@ func main() { if cfg.InstanceID, err = uuid.New().ID(); err != nil { logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err)) exitCode = 1 + return } } @@ -118,20 +88,22 @@ func main() { if err := env.Parse(&ec); err != nil { logger.Error(fmt.Sprintf("failed to load email configuration : %s", err)) exitCode = 1 + return } - // Create new database for rule engine. dbConfig := pgclient.Config{Name: defDB} if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil { logger.Error(err.Error()) exitCode = 1 + return } db, err := pgclient.Setup(dbConfig, *repg.Migration()) if err != nil { logger.Error(err.Error()) exitCode = 1 + return } defer db.Close() @@ -140,6 +112,7 @@ func main() { if err != nil { logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err)) exitCode = 1 + return } defer func() { @@ -149,87 +122,79 @@ func main() { }() tracer := tp.Tracer(svcName) - rePubSub, err := nats.NewPubSub(ctx, cfg.BrokerURL, logger) - if err != nil { - logger.Error(fmt.Sprintf("failed to connect to message broker for rePubSub: %s", err)) - exitCode = 1 - return - } - defer rePubSub.Close() - - writersCfg := jsStreamConfig - writersCfg.Name = writersCfgName - writersCfg.Subjects = writersSubjects - writersPub, err := nats.NewPublisher(ctx, cfg.BrokerURL, nats.JSStreamConfig(writersCfg), nats.Prefix(writersPrefix)) - if err != nil { - logger.Error(fmt.Sprintf("failed to connect to message broker for writers publisher: %s", err)) - exitCode = 1 - return - } - defer writersPub.Close() - - alarmsCfg := jsStreamConfig - alarmsCfg.Name = alarmsCfgName - alarmsCfg.Subjects = alarmsSubjects - alarmsPub, err := nats.NewPublisher(ctx, cfg.BrokerURL, nats.JSStreamConfig(alarmsCfg), nats.Prefix(alarmsPrefix)) - if err != nil { - logger.Error(fmt.Sprintf("failed to connect to message broker for alarms publisher: %s", err)) - exitCode = 1 - return - } - defer alarmsPub.Close() - httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) exitCode = 1 + return } - rePubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, rePubSub) + msgSub, err := smqbrokers.NewPubSub(ctx, cfg.BrokerURL, logger) + if err != nil { + logger.Error(fmt.Sprintf("failed to connect to message broker for mg pubSub: %s", err)) + exitCode = 1 + + return + } + defer msgSub.Close() + msgSub = brokerstracing.NewPubSub(httpServerConfig, tracer, msgSub) + + writersPub, err := brokers.NewPublisher(ctx, cfg.BrokerURL) + if err != nil { + logger.Error(fmt.Sprintf("failed to connect to message broker for writers publisher: %s", err)) + exitCode = 1 + + return + } + defer writersPub.Close() writersPub = brokerstracing.NewPublisher(httpServerConfig, tracer, writersPub) + + alarmsPub, err := abrokers.NewPublisher(ctx, cfg.BrokerURL) + if err != nil { + logger.Error(fmt.Sprintf("failed to connect to message broker for alarms publisher: %s", err)) + exitCode = 1 + + return + } + defer alarmsPub.Close() alarmsPub = brokerstracing.NewPublisher(httpServerConfig, tracer, alarmsPub) grpcCfg := grpcclient.Config{} if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil { logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err)) exitCode = 1 + return } authn, authnClient, err := authnsvc.NewAuthentication(ctx, grpcCfg) if err != nil { logger.Error(err.Error()) exitCode = 1 + return } defer authnClient.Close() logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure()) - authz, authzClient, err := authzsvc.NewAuthorization(ctx, grpcCfg, nil) - if err != nil { - logger.Error(err.Error()) - exitCode = 1 - return - } - defer authzClient.Close() - logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure()) - database := pgclient.NewDatabase(db, dbConfig, tracer) - svc, err := newService(ctx, database, rePubSub, writersPub, alarmsPub, authz, cfg.ESURL, tracer, ec, logger) + svc, err := newService(database, msgSub, writersPub, alarmsPub, ec, logger) if err != nil { logger.Error(fmt.Sprintf("failed to create services: %s", err)) exitCode = 1 + return } subCfg := messaging.SubscriberConfig{ ID: svcName, - Topic: channelsTopic, + Topic: smqbrokers.SubjectAllChannels, DeliveryPolicy: messaging.DeliverAllPolicy, Handler: svc, } - if err := rePubSub.Subscribe(ctx, subCfg); err != nil { + if err := msgSub.Subscribe(ctx, subCfg); err != nil { logger.Error(fmt.Sprintf("failed to subscribe to internal message broker: %s", err)) exitCode = 1 + return } @@ -249,12 +214,10 @@ func main() { go chc.CallHome(ctx) } - // Start scheduler g.Go(func() error { return svc.StartScheduler(ctx) }) - // Start all servers g.Go(func() error { return httpSvc.Start() }) @@ -268,7 +231,7 @@ func main() { } } -func newService(ctx context.Context, db pgclient.Database, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) { +func newService(db pgclient.Database, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, ec email.Config, logger *slog.Logger) (re.Service, error) { repo := repg.NewRepository(db) idp := uuid.New() @@ -277,7 +240,6 @@ func newService(ctx context.Context, db pgclient.Database, rePubSub messaging.Pu logger.Error(fmt.Sprintf("failed to configure e-mailing util: %s", err.Error())) } - // csvc = authzmw.AuthorizationMiddleware(csvc, authz) csvc := re.NewService(repo, idp, rePubSub, writersPub, alarmsPub, re.NewTicker(time.Minute), emailerClient) csvc = middleware.LoggingMiddleware(csvc, logger) diff --git a/cmd/timescale-writer/main.go b/cmd/timescale-writer/main.go index 5148c38da..951a26374 100644 --- a/cmd/timescale-writer/main.go +++ b/cmd/timescale-writer/main.go @@ -15,12 +15,12 @@ import ( chclient "github.com/absmach/callhome/pkg/client" consumertracing "github.com/absmach/magistrala/consumers/tracing" httpapi "github.com/absmach/magistrala/consumers/writers/api" + "github.com/absmach/magistrala/consumers/writers/brokers" "github.com/absmach/magistrala/consumers/writers/timescale" "github.com/absmach/supermq" "github.com/absmach/supermq/consumers" smqlog "github.com/absmach/supermq/logger" jaegerclient "github.com/absmach/supermq/pkg/jaeger" - "github.com/absmach/supermq/pkg/messaging/brokers" brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing" pgclient "github.com/absmach/supermq/pkg/postgres" "github.com/absmach/supermq/pkg/prometheus" diff --git a/consumers/writers/brokers/brokers_nats.go b/consumers/writers/brokers/brokers_nats.go new file mode 100644 index 000000000..83206ed56 --- /dev/null +++ b/consumers/writers/brokers/brokers_nats.go @@ -0,0 +1,49 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +//go:build !rabbitmq +// +build !rabbitmq + +package brokers + +import ( + "context" + "log/slog" + "time" + + "github.com/absmach/supermq/pkg/messaging" + broker "github.com/absmach/supermq/pkg/messaging/nats" + "github.com/nats-io/nats.go/jetstream" +) + +const AllTopic = "writers.>" + +var cfg = jetstream.StreamConfig{ + Name: "writers", + Description: "SuperMQ Rules Engine stream for handling internal messages", + Subjects: []string{"writers.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e6, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, +} + +func NewPubSub(ctx context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) { + pb, err := broker.NewPubSub(ctx, url, logger, broker.JSStreamConfig(cfg)) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) { + pb, err := broker.NewPublisher(ctx, url, broker.JSStreamConfig(cfg)) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/consumers/writers/brokers/brokers_rabbitmq.go b/consumers/writers/brokers/brokers_rabbitmq.go new file mode 100644 index 000000000..aee3d1ec1 --- /dev/null +++ b/consumers/writers/brokers/brokers_rabbitmq.go @@ -0,0 +1,35 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +//go:build rabbitmq +// +build rabbitmq + +package brokers + +import ( + "context" + "log/slog" + + "github.com/absmach/supermq/pkg/messaging" + broker "github.com/absmach/supermq/pkg/messaging/rabbitmq" +) + +const AllTopic = "writers.#" + +func NewPubSub(_ context.Context, url string, logger *slog.Logger) (messaging.PubSub, error) { + pb, err := broker.NewPubSub(url, logger, broker.Prefix("writers")) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) { + pb, err := broker.NewPublisher(url, broker.Prefix("writers")) + if err != nil { + return nil, err + } + + return pb, nil +}