mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:10:19 +00:00
NOISSUE - Fix nil PubSub
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
+6
-6
@@ -28,6 +28,7 @@ import (
|
||||
authzsvc "github.com/absmach/supermq/pkg/authz/authsvc"
|
||||
"github.com/absmach/supermq/pkg/grpcclient"
|
||||
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
"github.com/absmach/supermq/pkg/messaging/brokers"
|
||||
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
|
||||
pgclient "github.com/absmach/supermq/pkg/postgres"
|
||||
@@ -36,7 +37,6 @@ import (
|
||||
"github.com/absmach/supermq/pkg/uuid"
|
||||
"github.com/caarlos0/env/v11"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
@@ -173,7 +173,8 @@ func main() {
|
||||
defer authzClient.Close()
|
||||
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())
|
||||
|
||||
svc, err := newService(ctx, db, dbConfig, authz, cfg.ESURL, tracer, ec, logger)
|
||||
database := pgclient.NewDatabase(db, dbConfig, tracer)
|
||||
svc, err := newService(ctx, database, pubSub, authz, cfg.ESURL, tracer, ec, logger)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create services: %s", err))
|
||||
exitCode = 1
|
||||
@@ -213,9 +214,8 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) {
|
||||
database := pgclient.NewDatabase(db, dbConfig, tracer)
|
||||
repo := repg.NewRepository(database)
|
||||
func newService(ctx context.Context, db pgclient.Database, pubsub messaging.PubSub, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, ec email.Config, logger *slog.Logger) (re.Service, error) {
|
||||
repo := repg.NewRepository(db)
|
||||
idp := uuid.New()
|
||||
|
||||
emailerClient, err := emailer.New(&ec)
|
||||
@@ -224,7 +224,7 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, auth
|
||||
}
|
||||
|
||||
// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
|
||||
csvc := re.NewService(repo, idp, nil, re.NewTicker(time.Minute), emailerClient)
|
||||
csvc := re.NewService(repo, idp, pubsub, re.NewTicker(time.Minute), emailerClient)
|
||||
csvc = middleware.LoggingMiddleware(csvc, logger)
|
||||
|
||||
return csvc, nil
|
||||
|
||||
+3
-1
@@ -34,6 +34,8 @@ const (
|
||||
hoursInDay = 24
|
||||
daysInWeek = 7
|
||||
monthsInYear = 12
|
||||
|
||||
publisher = "magistrala.re"
|
||||
)
|
||||
|
||||
var ErrInvalidRecurringType = errors.New("invalid recurring type")
|
||||
@@ -321,7 +323,7 @@ func (re *re) process(ctx context.Context, r Rule, msg interface{}) error {
|
||||
return nil
|
||||
}
|
||||
m := &messaging.Message{
|
||||
Publisher: "magistrala.re",
|
||||
Publisher: publisher,
|
||||
Created: time.Now().Unix(),
|
||||
Payload: []byte(result.String()),
|
||||
Channel: r.OutputChannel,
|
||||
|
||||
Reference in New Issue
Block a user