NOISSUE - Remove SuperMQ duplicates (#23)

* Update docker-compose to use SuperMQ

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Remove duplicate services

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update Bootstrap

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Update other services to use SMQ

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Switch config prefix to SMQ

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Remove leftovers

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Remove duplicate interface definitions

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Remove unused actions

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Remove unused API docs

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Resolve linter comments

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

* Fix provision

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>

---------

Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
Dušan Borovčanin
2024-12-31 11:04:17 +01:00
committed by GitHub
parent 57c3ecb175
commit 3bbb25bd64
699 changed files with 4836 additions and 130238 deletions
-233
View File
@@ -1,233 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
"time"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/auth"
api "github.com/absmach/magistrala/auth/api"
authgrpcapi "github.com/absmach/magistrala/auth/api/grpc/auth"
domainsgrpcapi "github.com/absmach/magistrala/auth/api/grpc/domains"
tokengrpcapi "github.com/absmach/magistrala/auth/api/grpc/token"
httpapi "github.com/absmach/magistrala/auth/api/http"
"github.com/absmach/magistrala/auth/events"
"github.com/absmach/magistrala/auth/jwt"
apostgres "github.com/absmach/magistrala/auth/postgres"
"github.com/absmach/magistrala/auth/tracing"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/policies/spicedb"
"github.com/absmach/magistrala/pkg/postgres"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
grpcserver "github.com/absmach/magistrala/pkg/server/grpc"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/authzed-go/v1"
"github.com/authzed/grpcutil"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
)
const (
svcName = "auth"
envPrefixHTTP = "MG_AUTH_HTTP_"
envPrefixGrpc = "MG_AUTH_GRPC_"
envPrefixDB = "MG_AUTH_DB_"
defDB = "auth"
defSvcHTTPPort = "8189"
defSvcGRPCPort = "8181"
)
type config struct {
LogLevel string `env:"MG_AUTH_LOG_LEVEL" envDefault:"info"`
SecretKey string `env:"MG_AUTH_SECRET_KEY" envDefault:"secret"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_AUTH_ADAPTER_INSTANCE_ID" envDefault:""`
AccessDuration time.Duration `env:"MG_AUTH_ACCESS_TOKEN_DURATION" envDefault:"1h"`
RefreshDuration time.Duration `env:"MG_AUTH_REFRESH_TOKEN_DURATION" envDefault:"24h"`
InvitationDuration time.Duration `env:"MG_AUTH_INVITATION_DURATION" envDefault:"168h"`
SpicedbHost string `env:"MG_SPICEDB_HOST" envDefault:"localhost"`
SpicedbPort string `env:"MG_SPICEDB_PORT" envDefault:"50051"`
SpicedbSchemaFile string `env:"MG_SPICEDB_SCHEMA_FILE" envDefault:"./docker/spicedb/schema.zed"`
SpicedbPreSharedKey string `env:"MG_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err.Error())
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
dbConfig := pgclient.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
logger.Error(err.Error())
}
db, err := pgclient.Setup(dbConfig, *apostgres.Migration())
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()
tp, err := jaeger.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
spicedbclient, err := initSpiceDB(ctx, cfg)
if err != nil {
logger.Error(fmt.Sprintf("failed to init spicedb grpc client : %s\n", err.Error()))
exitCode = 1
return
}
svc := newService(ctx, db, tracer, cfg, dbConfig, logger, spicedbclient)
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, logger, cfg.InstanceID), logger)
grpcServerConfig := server.Config{Port: defSvcGRPCPort}
if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGrpc}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
registerAuthServiceServer := func(srv *grpc.Server) {
reflection.Register(srv)
magistrala.RegisterTokenServiceServer(srv, tokengrpcapi.NewTokenServer(svc))
magistrala.RegisterDomainsServiceServer(srv, domainsgrpcapi.NewDomainsServer(svc))
magistrala.RegisterAuthServiceServer(srv, authgrpcapi.NewAuthServer(svc))
}
gs := grpcserver.NewServer(ctx, cancel, svcName, grpcServerConfig, registerAuthServiceServer, logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
return hs.Start()
})
g.Go(func() error {
return gs.Start()
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs, gs)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("users service terminated: %s", err))
}
}
func initSpiceDB(ctx context.Context, cfg config) (*authzed.ClientWithExperimental, error) {
client, err := authzed.NewClientWithExperimentalAPIs(
fmt.Sprintf("%s:%s", cfg.SpicedbHost, cfg.SpicedbPort),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpcutil.WithInsecureBearerToken(cfg.SpicedbPreSharedKey),
)
if err != nil {
return client, err
}
if err := initSchema(ctx, client, cfg.SpicedbSchemaFile); err != nil {
return client, err
}
return client, nil
}
func initSchema(ctx context.Context, client *authzed.ClientWithExperimental, schemaFilePath string) error {
schemaContent, err := os.ReadFile(schemaFilePath)
if err != nil {
return fmt.Errorf("failed to read spice db schema file : %w", err)
}
if _, err = client.SchemaServiceClient.WriteSchema(ctx, &v1.WriteSchemaRequest{Schema: string(schemaContent)}); err != nil {
return fmt.Errorf("failed to create schema in spicedb : %w", err)
}
return nil
}
func newService(ctx context.Context, db *sqlx.DB, tracer trace.Tracer, cfg config, dbConfig pgclient.Config, logger *slog.Logger, spicedbClient *authzed.ClientWithExperimental) auth.Service {
database := postgres.NewDatabase(db, dbConfig, tracer)
keysRepo := apostgres.New(database)
domainsRepo := apostgres.NewDomainRepository(database)
idProvider := uuid.New()
pEvaluator := spicedb.NewPolicyEvaluator(spicedbClient, logger)
pService := spicedb.NewPolicyService(spicedbClient, logger)
t := jwt.New([]byte(cfg.SecretKey))
svc := auth.New(keysRepo, domainsRepo, idProvider, t, pEvaluator, pService, cfg.AccessDuration, cfg.RefreshDuration, cfg.InvitationDuration)
svc, err := events.NewEventStoreMiddleware(ctx, svc, cfg.ESURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to init event store middleware : %s", err))
return nil
}
svc = api.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("groups", "api")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = tracing.New(svc, tracer)
return svc
}
+71 -55
View File
@@ -13,30 +13,31 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/bootstrap"
"github.com/absmach/magistrala/bootstrap/api"
"github.com/absmach/magistrala/bootstrap/events/consumer"
"github.com/absmach/magistrala/bootstrap/events/producer"
"github.com/absmach/magistrala/bootstrap/middleware"
bootstrappg "github.com/absmach/magistrala/bootstrap/postgres"
"github.com/absmach/magistrala/bootstrap/tracing"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
mgauthz "github.com/absmach/magistrala/pkg/authz"
authsvcAuthz "github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/grpcclient"
"github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/policies"
"github.com/absmach/magistrala/pkg/policies/spicedb"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
mgsdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/supermq"
"github.com/absmach/supermq/bootstrap"
httpapi "github.com/absmach/supermq/bootstrap/api"
"github.com/absmach/supermq/bootstrap/events/consumer"
"github.com/absmach/supermq/bootstrap/events/producer"
"github.com/absmach/supermq/bootstrap/middleware"
bootstrappg "github.com/absmach/supermq/bootstrap/postgres"
"github.com/absmach/supermq/bootstrap/tracing"
smqlog "github.com/absmach/supermq/logger"
authsvcAuthn "github.com/absmach/supermq/pkg/authn/authsvc"
smqauthz "github.com/absmach/supermq/pkg/authz"
authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
"github.com/absmach/supermq/pkg/events"
"github.com/absmach/supermq/pkg/events/store"
"github.com/absmach/supermq/pkg/grpcclient"
"github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/policies"
"github.com/absmach/supermq/pkg/policies/spicedb"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
mgsdk "github.com/absmach/supermq/pkg/sdk"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/authzed/authzed-go/v1"
"github.com/authzed/grpcutil"
"github.com/caarlos0/env/v11"
@@ -48,30 +49,31 @@ import (
)
const (
svcName = "bootstrap"
envPrefixDB = "MG_BOOTSTRAP_DB_"
envPrefixHTTP = "MG_BOOTSTRAP_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
defDB = "bootstrap"
defSvcHTTPPort = "9013"
svcName = "bootstrap"
envPrefixDB = "SMQ_BOOTSTRAP_DB_"
envPrefixHTTP = "SMQ_BOOTSTRAP_HTTP_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
defDB = "bootstrap"
defSvcHTTPPort = "9013"
thingsStream = "events.magistrala.things"
streamID = "magistrala.bootstrap"
stream = "events.supermq.clients"
streamID = "supermq.bootstrap"
)
type config struct {
LogLevel string `env:"MG_BOOTSTRAP_LOG_LEVEL" envDefault:"info"`
EncKey string `env:"MG_BOOTSTRAP_ENCRYPT_KEY" envDefault:"12345678910111213141516171819202"`
ESConsumerName string `env:"MG_BOOTSTRAP_EVENT_CONSUMER" envDefault:"bootstrap"`
ThingsURL string `env:"MG_THINGS_URL" envDefault:"http://localhost:9000"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_BOOTSTRAP_INSTANCE_ID" envDefault:""`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"MG_SPICEDB_HOST" envDefault:"localhost"`
SpicedbPort string `env:"MG_SPICEDB_PORT" envDefault:"50051"`
SpicedbPreSharedKey string `env:"MG_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
LogLevel string `env:"SMQ_BOOTSTRAP_LOG_LEVEL" envDefault:"info"`
EncKey string `env:"SMQ_BOOTSTRAP_ENCRYPT_KEY" envDefault:"12345678910111213141516171819202"`
ESConsumerName string `env:"SMQ_BOOTSTRAP_EVENT_CONSUMER" envDefault:"bootstrap"`
ClientsURL string `env:"SMQ_CLIENTS_URL" envDefault:"http://localhost:9000"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_BOOTSTRAP_INSTANCE_ID" envDefault:""`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"`
SpicedbPort string `env:"SMQ_SPICEDB_PORT" envDefault:"50051"`
SpicedbPreSharedKey string `env:"SMQ_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
}
func main() {
@@ -83,13 +85,13 @@ func main() {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
logger, err := smqlog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
defer smqlog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
@@ -148,7 +150,21 @@ func main() {
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
defer authnClient.Close()
authz, authzClient, err := authsvcAuthz.NewAuthorization(ctx, grpcCfg)
domsGrpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
logger.Error(fmt.Sprintf("failed to load domains gRPC client configuration : %s", err))
exitCode = 1
return
}
domainsAuthz, _, domainsHandler, err := domainsAuthz.NewAuthorization(ctx, domsGrpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer domainsHandler.Close()
authz, authzClient, err := authsvcAuthz.NewAuthorization(ctx, grpcCfg, domainsAuthz)
if err != nil {
logger.Error(err.Error())
exitCode = 1
@@ -165,8 +181,8 @@ func main() {
return
}
if err = subscribeToThingsES(ctx, svc, cfg, logger); err != nil {
logger.Error(fmt.Sprintf("failed to subscribe to things event store: %s", err))
if err = subscribeToClientsES(ctx, svc, cfg, logger); err != nil {
logger.Error(fmt.Sprintf("failed to subscribe to clients event store: %s", err))
exitCode = 1
return
}
@@ -179,10 +195,10 @@ func main() {
exitCode = 1
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, authn, bootstrap.NewConfigReader([]byte(cfg.EncKey)), logger, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, authn, bootstrap.NewConfigReader([]byte(cfg.EncKey)), logger, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
chc := chclient.New(svcName, supermq.Version, logger, cancel)
go chc.CallHome(ctx)
}
@@ -199,13 +215,13 @@ func main() {
}
}
func newService(ctx context.Context, authz mgauthz.Authorization, policySvc policies.Service, db *sqlx.DB, tracer trace.Tracer, logger *slog.Logger, cfg config, dbConfig pgclient.Config) (bootstrap.Service, error) {
func newService(ctx context.Context, authz smqauthz.Authorization, policySvc policies.Service, db *sqlx.DB, tracer trace.Tracer, logger *slog.Logger, cfg config, dbConfig pgclient.Config) (bootstrap.Service, error) {
database := pgclient.NewDatabase(db, dbConfig, tracer)
repoConfig := bootstrappg.NewConfigRepository(database, logger)
config := mgsdk.Config{
ThingsURL: cfg.ThingsURL,
ClientsURL: cfg.ClientsURL,
}
sdk := mgsdk.NewSDK(config)
@@ -228,14 +244,14 @@ func newService(ctx context.Context, authz mgauthz.Authorization, policySvc poli
return svc, nil
}
func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger *slog.Logger) error {
func subscribeToClientsES(ctx context.Context, svc bootstrap.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Stream: stream,
Consumer: cfg.ESConsumerName,
Handler: consumer.NewEventHandler(svc),
}
-168
View File
@@ -1,168 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains certs main function to start the certs service.
package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/certs"
"github.com/absmach/magistrala/certs/api"
pki "github.com/absmach/magistrala/certs/pki/amcerts"
"github.com/absmach/magistrala/certs/tracing"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/prometheus"
mgsdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/caarlos0/env/v11"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
const (
svcName = "certs"
envPrefixDB = "MG_CERTS_DB_"
envPrefixHTTP = "MG_CERTS_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
defDB = "certs"
defSvcHTTPPort = "9019"
)
type config struct {
LogLevel string `env:"MG_CERTS_LOG_LEVEL" envDefault:"info"`
ThingsURL string `env:"MG_THINGS_URL" envDefault:"http://localhost:9000"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_CERTS_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
// Sign and issue certificates without 3rd party PKI
SignCAPath string `env:"MG_CERTS_SIGN_CA_PATH" envDefault:"ca.crt"`
SignCAKeyPath string `env:"MG_CERTS_SIGN_CA_KEY_PATH" envDefault:"ca.key"`
// Amcerts SDK settings
SDKHost string `env:"MG_CERTS_SDK_HOST" envDefault:""`
SDKCertsURL string `env:"MG_CERTS_SDK_CERTS_URL" envDefault:"http://localhost:9010"`
TLSVerification bool `env:"MG_CERTS_SDK_TLS_VERIFICATION" envDefault:"false"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
if cfg.SDKHost == "" {
logger.Error("No host specified for PKI engine")
exitCode = 1
return
}
pkiclient, err := pki.NewAgent(cfg.SDKHost, cfg.SDKCertsURL, cfg.TLSVerification)
if err != nil {
logger.Error("failed to configure client for PKI engine")
exitCode = 1
return
}
grpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}
authn, authnClient, err := authsvcAuthn.NewAuthentication(ctx, grpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnClient.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
svc := newService(tracer, logger, cfg, pkiclient)
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, authn, logger, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
return hs.Start()
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("Certs service terminated: %s", err))
}
}
func newService(tracer trace.Tracer, logger *slog.Logger, cfg config, pkiAgent pki.Agent) certs.Service {
config := mgsdk.Config{
ThingsURL: cfg.ThingsURL,
}
sdk := mgsdk.NewSDK(config)
svc := certs.New(sdk, pkiAgent)
svc = api.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = tracing.New(svc, tracer)
return svc
}
-263
View File
@@ -1,263 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains cli main function to run the cli.
package main
import (
"log"
"github.com/absmach/magistrala/cli"
sdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/spf13/cobra"
)
func main() {
msgContentType := string(sdk.CTJSONSenML)
sdkConf := sdk.Config{
MsgContentType: sdk.ContentType(msgContentType),
}
// Root
rootCmd := &cobra.Command{
Use: "magistrala-cli",
PersistentPreRun: func(_ *cobra.Command, _ []string) {
cliConf, err := cli.ParseConfig(sdkConf)
if err != nil {
log.Fatalf("Failed to parse config: %s", err)
}
if cliConf.MsgContentType == "" {
cliConf.MsgContentType = sdk.ContentType(msgContentType)
}
s := sdk.NewSDK(cliConf)
cli.SetSDK(s)
},
}
// API commands
healthCmd := cli.NewHealthCmd()
usersCmd := cli.NewUsersCmd()
domainsCmd := cli.NewDomainsCmd()
thingsCmd := cli.NewThingsCmd()
groupsCmd := cli.NewGroupsCmd()
channelsCmd := cli.NewChannelsCmd()
messagesCmd := cli.NewMessagesCmd()
provisionCmd := cli.NewProvisionCmd()
bootstrapCmd := cli.NewBootstrapCmd()
certsCmd := cli.NewCertsCmd()
subscriptionsCmd := cli.NewSubscriptionCmd()
configCmd := cli.NewConfigCmd()
invitationsCmd := cli.NewInvitationsCmd()
journalCmd := cli.NewJournalCmd()
// Root Commands
rootCmd.AddCommand(healthCmd)
rootCmd.AddCommand(usersCmd)
rootCmd.AddCommand(domainsCmd)
rootCmd.AddCommand(groupsCmd)
rootCmd.AddCommand(thingsCmd)
rootCmd.AddCommand(channelsCmd)
rootCmd.AddCommand(messagesCmd)
rootCmd.AddCommand(provisionCmd)
rootCmd.AddCommand(bootstrapCmd)
rootCmd.AddCommand(certsCmd)
rootCmd.AddCommand(subscriptionsCmd)
rootCmd.AddCommand(configCmd)
rootCmd.AddCommand(invitationsCmd)
rootCmd.AddCommand(journalCmd)
// Root Flags
rootCmd.PersistentFlags().StringVarP(
&sdkConf.BootstrapURL,
"bootstrap-url",
"b",
sdkConf.BootstrapURL,
"Bootstrap service URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.CertsURL,
"certs-url",
"s",
sdkConf.CertsURL,
"Certs service URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.ThingsURL,
"things-url",
"t",
sdkConf.ThingsURL,
"Things service URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.UsersURL,
"users-url",
"u",
sdkConf.UsersURL,
"Users service URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.DomainsURL,
"domains-url",
"d",
sdkConf.DomainsURL,
"Domains service URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.HTTPAdapterURL,
"http-url",
"p",
sdkConf.HTTPAdapterURL,
"HTTP adapter URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.ReaderURL,
"reader-url",
"R",
sdkConf.ReaderURL,
"Reader URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.InvitationsURL,
"invitations-url",
"v",
sdkConf.InvitationsURL,
"Inivitations URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.JournalURL,
"journal-url",
"a",
sdkConf.JournalURL,
"Journal Log URL",
)
rootCmd.PersistentFlags().StringVarP(
&sdkConf.HostURL,
"host-url",
"H",
sdkConf.HostURL,
"Host URL",
)
rootCmd.PersistentFlags().StringVarP(
&msgContentType,
"content-type",
"y",
msgContentType,
"Message content type",
)
rootCmd.PersistentFlags().BoolVarP(
&sdkConf.TLSVerification,
"insecure",
"i",
sdkConf.TLSVerification,
"Do not check for TLS cert",
)
rootCmd.PersistentFlags().StringVarP(
&cli.ConfigPath,
"config",
"c",
cli.ConfigPath,
"Config path",
)
rootCmd.PersistentFlags().BoolVarP(
&cli.RawOutput,
"raw",
"r",
cli.RawOutput,
"Enables raw output mode for easier parsing of output",
)
rootCmd.PersistentFlags().BoolVarP(
&sdkConf.CurlFlag,
"curl",
"x",
false,
"Convert HTTP request to cURL command",
)
// Client and Channels Flags
rootCmd.PersistentFlags().Uint64VarP(
&cli.Limit,
"limit",
"l",
10,
"Limit query parameter",
)
rootCmd.PersistentFlags().Uint64VarP(
&cli.Offset,
"offset",
"o",
0,
"Offset query parameter",
)
rootCmd.PersistentFlags().StringVarP(
&cli.Name,
"name",
"n",
"",
"Name query parameter",
)
rootCmd.PersistentFlags().StringVarP(
&cli.Identity,
"identity",
"I",
"",
"User identity query parameter",
)
rootCmd.PersistentFlags().StringVarP(
&cli.Metadata,
"metadata",
"m",
"",
"Metadata query parameter",
)
rootCmd.PersistentFlags().StringVarP(
&cli.Status,
"status",
"S",
"",
"User status query parameter",
)
rootCmd.PersistentFlags().StringVarP(
&cli.State,
"state",
"z",
"",
"Bootstrap state query parameter",
)
rootCmd.PersistentFlags().StringVarP(
&cli.Topic,
"topic",
"T",
"",
"Subscription topic query parameter",
)
rootCmd.PersistentFlags().StringVarP(
&cli.Contact,
"contact",
"C",
"",
"Subscription contact query parameter",
)
if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
}
}
-160
View File
@@ -1,160 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains coap-adapter main function to start the coap-adapter service.
package main
import (
"context"
"fmt"
"log"
"net/url"
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/coap"
"github.com/absmach/magistrala/coap/api"
"github.com/absmach/magistrala/coap/tracing"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
coapserver "github.com/absmach/magistrala/pkg/server/coap"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/caarlos0/env/v11"
"golang.org/x/sync/errgroup"
)
const (
svcName = "coap_adapter"
envPrefix = "MG_COAP_ADAPTER_"
envPrefixHTTP = "MG_COAP_ADAPTER_HTTP_"
envPrefixThings = "MG_THINGS_AUTH_GRPC_"
defSvcHTTPPort = "5683"
defSvcCoAPPort = "5683"
)
type config struct {
LogLevel string `env:"MG_COAP_ADAPTER_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_COAP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
coapServerConfig := server.Config{Port: defSvcCoAPPort}
if err := env.ParseWithOptions(&coapServerConfig, env.Options{Prefix: envPrefix}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s CoAP server configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClient, thingsHandler, err := grpcclient.SetupThingsClient(ctx, thingsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsHandler.Close()
logger.Info("Things service gRPC client successfully connected to things gRPC server " + thingsHandler.Secure())
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
nps, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer nps.Close()
nps = brokerstracing.NewPubSub(coapServerConfig, tracer, nps)
svc := coap.New(thingsClient, nps)
svc = tracing.New(tracer, svc)
svc = api.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
svc = api.MetricsMiddleware(svc, counter, latency)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(cfg.InstanceID), logger)
cs := coapserver.NewServer(ctx, cancel, svcName, coapServerConfig, api.MakeCoAPHandler(svc, logger), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
return hs.Start()
})
g.Go(func() error {
return cs.Start()
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs, cs)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("CoAP adapter service terminated: %s", err))
}
}
-207
View File
@@ -1,207 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains http-adapter main function to start the http-adapter service.
package main
import (
"context"
"crypto/tls"
"fmt"
"log"
"log/slog"
"net/http"
"net/url"
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
adapter "github.com/absmach/magistrala/http"
"github.com/absmach/magistrala/http/api"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
"github.com/absmach/magistrala/pkg/messaging/handler"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/mgate"
mgatehttp "github.com/absmach/mgate/pkg/http"
"github.com/absmach/mgate/pkg/session"
"github.com/caarlos0/env/v11"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
const (
svcName = "http_adapter"
envPrefix = "MG_HTTP_ADAPTER_"
envPrefixThings = "MG_THINGS_AUTH_GRPC_"
defSvcHTTPPort = "80"
targetHTTPPort = "81"
targetHTTPHost = "http://localhost"
)
type config struct {
LogLevel string `env:"MG_HTTP_ADAPTER_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_HTTP_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefix}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClient, thingsHandler, err := grpcclient.SetupThingsClient(ctx, thingsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsHandler.Close()
logger.Info("Things service gRPC client successfully connected to things gRPC server " + thingsHandler.Secure())
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
pub, err := brokers.NewPublisher(ctx, cfg.BrokerURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer pub.Close()
pub = brokerstracing.NewPublisher(httpServerConfig, tracer, pub)
svc := newService(pub, thingsClient, logger, tracer)
targetServerCfg := server.Config{Port: targetHTTPPort}
hs := httpserver.NewServer(ctx, cancel, svcName, targetServerCfg, api.MakeHandler(logger, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
return hs.Start()
})
g.Go(func() error {
return proxyHTTP(ctx, httpServerConfig, logger, svc)
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("HTTP adapter service terminated: %s", err))
}
}
func newService(pub messaging.Publisher, tc magistrala.ThingsServiceClient, logger *slog.Logger, tracer trace.Tracer) session.Handler {
svc := adapter.NewHandler(pub, logger, tc)
svc = handler.NewTracing(tracer, svc)
svc = handler.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
svc = handler.MetricsMiddleware(svc, counter, latency)
return svc
}
func proxyHTTP(ctx context.Context, cfg server.Config, logger *slog.Logger, sessionHandler session.Handler) error {
config := mgate.Config{
Address: fmt.Sprintf("%s:%s", "", cfg.Port),
Target: fmt.Sprintf("%s:%s", targetHTTPHost, targetHTTPPort),
PathPrefix: "/",
}
if cfg.CertFile != "" || cfg.KeyFile != "" {
tlsCert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return err
}
config.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCert},
}
}
mp, err := mgatehttp.NewProxy(config, sessionHandler, logger)
if err != nil {
return err
}
http.HandleFunc("/", mp.ServeHTTP)
errCh := make(chan error)
switch {
case cfg.CertFile != "" || cfg.KeyFile != "":
go func() {
errCh <- mp.Listen(ctx)
}()
logger.Info(fmt.Sprintf("%s service https server listening at %s:%s with TLS cert %s and key %s", svcName, cfg.Host, cfg.Port, cfg.CertFile, cfg.KeyFile))
default:
go func() {
errCh <- mp.Listen(ctx)
}()
logger.Info(fmt.Sprintf("%s service http server listening at %s:%s without TLS", svcName, cfg.Host, cfg.Port))
}
select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy HTTP shutdown at %s", config.Target))
return nil
case err := <-errCh:
return err
}
}
-196
View File
@@ -1,196 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains invitations main function to start the invitations service.
package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/invitations"
"github.com/absmach/magistrala/invitations/api"
"github.com/absmach/magistrala/invitations/middleware"
invitationspg "github.com/absmach/magistrala/invitations/postgres"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
mgauthz "github.com/absmach/magistrala/pkg/authz"
authsvcAuthz "github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
"github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/postgres"
clientspg "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
mgsdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/absmach/magistrala/pkg/server"
"github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
const (
svcName = "invitations"
envPrefixDB = "MG_INVITATIONS_DB_"
envPrefixHTTP = "MG_INVITATIONS_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
defDB = "invitations"
defSvcHTTPPort = "9020"
)
type config struct {
LogLevel string `env:"MG_INVITATIONS_LOG_LEVEL" envDefault:"info"`
UsersURL string `env:"MG_USERS_URL" envDefault:"http://localhost:9002"`
DomainsURL string `env:"MG_DOMAINS_URL" envDefault:"http://localhost:8189"`
InstanceID string `env:"MG_INVITATIONS_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
dbConfig := clientspg.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s database configuration : %s", svcName, err))
exitCode = 1
return
}
db, err := clientspg.Setup(dbConfig, *invitationspg.Migration())
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()
authClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&authClientCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err.Error()))
exitCode = 1
return
}
tokenClient, tokenHandler, err := grpcclient.SetupTokenClient(ctx, authClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer tokenHandler.Close()
logger.Info("Token service client successfully connected to auth gRPC server " + tokenHandler.Secure())
authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, authClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnHandler.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnHandler.Secure())
authz, authzHandler, err := authsvcAuthz.NewAuthorization(ctx, authClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authzHandler.Close()
logger.Info("Authz successfully connected to auth gRPC server " + authzHandler.Secure())
tp, err := jaeger.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
svc, err := newService(db, dbConfig, authz, tokenClient, tracer, cfg, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s service: %s", svcName, err))
exitCode = 1
return
}
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
httpSvr := http.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, authn, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
return httpSvr.Start()
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, httpSvr)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
}
}
func newService(db *sqlx.DB, dbConfig clientspg.Config, authz mgauthz.Authorization, token magistrala.TokenServiceClient, tracer trace.Tracer, conf config, logger *slog.Logger) (invitations.Service, error) {
database := postgres.NewDatabase(db, dbConfig, tracer)
repo := invitationspg.NewRepository(database)
config := mgsdk.Config{
UsersURL: conf.UsersURL,
DomainsURL: conf.DomainsURL,
}
sdk := mgsdk.NewSDK(config)
svc := invitations.NewService(token, repo, sdk)
svc = middleware.AuthorizationMiddleware(authz, svc)
svc = middleware.Tracing(svc, tracer)
svc = middleware.Logging(logger, svc)
counter, latency := prometheus.MakeMetrics(svcName, "api")
svc = middleware.Metrics(counter, latency, svc)
return svc, nil
}
-193
View File
@@ -1,193 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains journal main function to start the journal service.
package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/journal"
"github.com/absmach/magistrala/journal/api"
"github.com/absmach/magistrala/journal/events"
"github.com/absmach/magistrala/journal/middleware"
journalpg "github.com/absmach/magistrala/journal/postgres"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
mgauthz "github.com/absmach/magistrala/pkg/authz"
authsvcAuthz "github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/postgres"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
"github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
const (
svcName = "journal"
envPrefixDB = "MG_JOURNAL_DB_"
envPrefixHTTP = "MG_JOURNAL_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
defDB = "journal"
defSvcHTTPPort = "9021"
)
type config struct {
LogLevel string `env:"MG_JOURNAL_LOG_LEVEL" envDefault:"info"`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_JOURNAL_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err)
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
dbConfig := pgclient.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
db, err := pgclient.Setup(dbConfig, *journalpg.Migration())
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()
authClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&authClientCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}
authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, authClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnHandler.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnHandler.Secure())
authz, authzHandler, err := authsvcAuthz.NewAuthorization(ctx, authClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authzHandler.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authzHandler.Secure())
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("error shutting down tracer provider: %s", err))
}
}()
tracer := tp.Tracer(svcName)
svc := newService(db, dbConfig, authz, logger, tracer)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create subscriber: %s", err))
exitCode = 1
return
}
logger.Info("Subscribed to Event Store")
if err := events.Start(ctx, svcName, subscriber, svc); err != nil {
logger.Error("failed to start %s service: %s", svcName, err)
exitCode = 1
return
}
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
hs := http.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, authn, logger, svcName, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
return hs.Start()
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
}
}
func newService(db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, logger *slog.Logger, tracer trace.Tracer) journal.Service {
database := postgres.NewDatabase(db, dbConfig, tracer)
repo := journalpg.NewRepository(database)
idp := uuid.New()
svc := journal.NewService(idp, repo)
svc = middleware.AuthorizationMiddleware(svc, authz)
svc = middleware.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("journal", "journal_writer")
svc = middleware.MetricsMiddleware(svc, counter, latency)
svc = middleware.Tracing(svc, tracer)
return svc
}
-288
View File
@@ -1,288 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains mqtt-adapter main function to start the mqtt-adapter service.
package main
import (
"context"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
"time"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/mqtt"
"github.com/absmach/magistrala/mqtt/events"
mqtttracing "github.com/absmach/magistrala/mqtt/tracing"
"github.com/absmach/magistrala/pkg/errors"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
"github.com/absmach/magistrala/pkg/messaging/handler"
mqttpub "github.com/absmach/magistrala/pkg/messaging/mqtt"
"github.com/absmach/magistrala/pkg/server"
"github.com/absmach/magistrala/pkg/uuid"
mgate "github.com/absmach/mgate"
mgatemqtt "github.com/absmach/mgate/pkg/mqtt"
"github.com/absmach/mgate/pkg/mqtt/websocket"
"github.com/absmach/mgate/pkg/session"
"github.com/caarlos0/env/v11"
"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"
)
const (
svcName = "mqtt"
envPrefixThings = "MG_THINGS_AUTH_GRPC_"
wsPathPrefix = "/mqtt"
)
type config struct {
LogLevel string `env:"MG_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"MG_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"MG_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"MG_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTForwarderTimeout time.Duration `env:"MG_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"`
MQTTTargetHealthCheck string `env:"MG_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
MQTTQoS uint8 `env:"MG_MQTT_ADAPTER_MQTT_QOS" envDefault:"1"`
HTTPPort string `env:"MG_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"MG_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"MG_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
HTTPTargetPath string `env:"MG_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
Instance string `env:"MG_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
if cfg.MQTTTargetHealthCheck != "" {
notify := func(e error, next time.Duration) {
logger.Info(fmt.Sprintf("Broker not ready: %s, next try in %s", e.Error(), next))
}
err := backoff.RetryNotify(healthcheck(cfg), backoff.NewExponentialBackOff(), notify)
if err != nil {
logger.Error(fmt.Sprintf("MQTT healthcheck limit exceeded, exiting. %s ", err))
exitCode = 1
return
}
}
serverConfig := server.Config{
Host: cfg.HTTPTargetHost,
Port: cfg.HTTPTargetPort,
}
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
bsub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer bsub.Close()
bsub = brokerstracing.NewPubSub(serverConfig, tracer, bsub)
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("mqtt://%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTQoS, cfg.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
exitCode = 1
return
}
defer mpub.Close()
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(serverConfig, tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, bsub, mpub); err != nil {
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
exitCode = 1
return
}
np, err := brokers.NewPublisher(ctx, cfg.BrokerURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer np.Close()
np = brokerstracing.NewPublisher(serverConfig, tracer, np)
es, err := events.NewEventStore(ctx, cfg.ESURL, cfg.Instance)
if err != nil {
logger.Error(fmt.Sprintf("failed to create %s event store : %s", svcName, err))
exitCode = 1
return
}
thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClient, thingsHandler, err := grpcclient.SetupThingsClient(ctx, thingsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsHandler.Close()
logger.Info("Things service gRPC client successfully connected to things gRPC server " + thingsHandler.Secure())
h := mqtt.NewHandler(np, es, logger, thingsClient)
h = handler.NewTracing(tracer, h)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
var interceptor session.Interceptor
logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort))
g.Go(func() error {
return proxyMQTT(ctx, cfg, logger, h, interceptor)
})
logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPPort))
g.Go(func() error {
return proxyWS(ctx, cfg, logger, h, interceptor)
})
g.Go(func() error {
return stopSignalHandler(ctx, cancel, logger)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("mProxy terminated: %s", err))
}
}
func proxyMQTT(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
config := mgate.Config{
Address: fmt.Sprintf(":%s", cfg.MQTTPort),
Target: fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort),
}
mproxy := mgatemqtt.New(config, sessionHandler, interceptor, logger)
errCh := make(chan error)
go func() {
errCh <- mproxy.Listen(ctx)
}()
select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy MQTT shutdown at %s", config.Target))
return nil
case err := <-errCh:
return err
}
}
func proxyWS(ctx context.Context, cfg config, logger *slog.Logger, sessionHandler session.Handler, interceptor session.Interceptor) error {
config := mgate.Config{
Address: fmt.Sprintf("%s:%s", "", cfg.HTTPPort),
Target: fmt.Sprintf("ws://%s:%s%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort, wsPathPrefix),
PathPrefix: wsPathPrefix,
}
wp := websocket.New(config, sessionHandler, interceptor, logger)
http.HandleFunc(wsPathPrefix, wp.ServeHTTP)
errCh := make(chan error)
go func() {
errCh <- wp.Listen(ctx)
}()
select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy MQTT WS shutdown at %s", config.Target))
return nil
case err := <-errCh:
return err
}
}
func healthcheck(cfg config) func() error {
return func() error {
res, err := http.Get(cfg.MQTTTargetHealthCheck)
if err != nil {
return err
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return errors.New(string(body))
}
return nil
}
}
func stopSignalHandler(ctx context.Context, cancel context.CancelFunc, logger *slog.Logger) error {
c := make(chan os.Signal, 2)
signal.Notify(c, syscall.SIGINT, syscall.SIGABRT)
select {
case sig := <-c:
defer cancel()
logger.Info(fmt.Sprintf("%s service shutdown by signal: %s", svcName, sig))
return nil
case <-ctx.Done():
return nil
}
}
+60 -54
View File
@@ -12,38 +12,38 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
"github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/readers"
"github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/postgres"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/authn/authsvc"
"github.com/absmach/supermq/pkg/grpcclient"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/absmach/supermq/readers/postgres"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
)
const (
svcName = "postgres-reader"
envPrefixDB = "MG_POSTGRES_"
envPrefixHTTP = "MG_POSTGRES_READER_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
envPrefixThings = "MG_THINGS_AUTH_GRPC_"
defDB = "magistrala"
defSvcHTTPPort = "9009"
svcName = "postgres-reader"
envPrefixDB = "SMQ_POSTGRES_"
envPrefixHTTP = "SMQ_POSTGRES_READER_HTTP_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
envPrefixClients = "SMQ_CLIENTS_AUTH_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
defDB = "supermq"
defSvcHTTPPort = "9009"
)
type config struct {
LogLevel string `env:"MG_POSTGRES_READER_LOG_LEVEL" envDefault:"info"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_POSTGRES_READER_INSTANCE_ID" envDefault:""`
LogLevel string `env:"SMQ_POSTGRES_READER_LOG_LEVEL" envDefault:"info"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_POSTGRES_READER_INSTANCE_ID" envDefault:""`
}
func main() {
@@ -55,13 +55,13 @@ func main() {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
logger, err := smqlog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
defer smqlog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
@@ -85,47 +85,53 @@ func main() {
}
defer db.Close()
clientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load clients gRPC client configuration : %s", err))
exitCode = 1
return
}
authz, authzHandler, err := authsvc.NewAuthorization(ctx, clientCfg)
clientsClient, clientsHandler, err := grpcclient.SetupClientsClient(ctx, clientsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authzHandler.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authzHandler.Secure())
defer clientsHandler.Close()
logger.Info("Clients service gRPC client successfully connected to clients gRPC server " + clientsHandler.Secure())
authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, clientCfg)
channelsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&channelsClientCfg, env.Options{Prefix: envPrefixChannels}); err != nil {
logger.Error(fmt.Sprintf("failed to load channels gRPC client configuration : %s", err))
exitCode = 1
return
}
channelsClient, channelsHandler, err := grpcclient.SetupChannelsClient(ctx, channelsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer channelsHandler.Close()
logger.Info("Channels service gRPC client successfully connected to channels gRPC server " + channelsHandler.Secure())
authnCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&authnCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}
authn, authnHandler, err := authsvc.NewAuthentication(ctx, authnCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnHandler.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnHandler.Secure())
thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClient, thingsHandler, err := grpcclient.SetupThingsClient(ctx, thingsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsHandler.Close()
logger.Info("Things service gRPC client successfully connected to things gRPC server " + thingsHandler.Secure())
logger.Info("authn successfully connected to auth gRPC server " + authnHandler.Secure())
repo := newService(db, logger)
@@ -135,10 +141,10 @@ func main() {
exitCode = 1
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, authn, authz, thingsClient, svcName, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(repo, authn, clientsClient, channelsClient, svcName, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
chc := chclient.New(svcName, supermq.Version, logger, cancel)
go chc.CallHome(ctx)
}
@@ -157,9 +163,9 @@ func main() {
func newService(db *sqlx.DB, logger *slog.Logger) readers.MessageRepository {
svc := postgres.New(db)
svc = api.LoggingMiddleware(svc, logger)
svc = httpapi.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("postgres", "message_reader")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = httpapi.MetricsMiddleware(svc, counter, latency)
return svc
}
+29 -29
View File
@@ -13,20 +13,20 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/consumers"
consumertracing "github.com/absmach/magistrala/consumers/tracing"
"github.com/absmach/magistrala/consumers/writers/api"
writerpg "github.com/absmach/magistrala/consumers/writers/postgres"
mglog "github.com/absmach/magistrala/logger"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
consumertracing "github.com/absmach/supermq/consumers/tracing"
httpapi "github.com/absmach/supermq/consumers/writers/api"
writerpg "github.com/absmach/supermq/consumers/writers/postgres"
smqlog "github.com/absmach/supermq/logger"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
@@ -34,20 +34,20 @@ import (
const (
svcName = "postgres-writer"
envPrefixDB = "MG_POSTGRES_"
envPrefixHTTP = "MG_POSTGRES_WRITER_HTTP_"
envPrefixDB = "SMQ_POSTGRES_"
envPrefixHTTP = "SMQ_POSTGRES_WRITER_HTTP_"
defDB = "messages"
defSvcHTTPPort = "9010"
)
type config struct {
LogLevel string `env:"MG_POSTGRES_WRITER_LOG_LEVEL" envDefault:"info"`
ConfigPath string `env:"MG_POSTGRES_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_POSTGRES_WRITER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
LogLevel string `env:"SMQ_POSTGRES_WRITER_LOG_LEVEL" envDefault:"info"`
ConfigPath string `env:"SMQ_POSTGRES_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_POSTGRES_WRITER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
@@ -59,13 +59,13 @@ func main() {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
logger, err := smqlog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
defer smqlog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
@@ -125,10 +125,10 @@ func main() {
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svcName, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
chc := chclient.New(svcName, supermq.Version, logger, cancel)
go chc.CallHome(ctx)
}
@@ -147,8 +147,8 @@ func main() {
func newService(db *sqlx.DB, logger *slog.Logger) consumers.BlockingConsumer {
svc := writerpg.New(db)
svc = api.LoggingMiddleware(svc, logger)
svc = httpapi.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("postgres", "message_writer")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = httpapi.MetricsMiddleware(svc, counter, latency)
return svc
}
+19 -19
View File
@@ -13,17 +13,17 @@ import (
"reflect"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/errors"
mggroups "github.com/absmach/magistrala/pkg/groups"
mgsdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/provision"
"github.com/absmach/magistrala/provision/api"
"github.com/absmach/magistrala/things"
httpapi "github.com/absmach/magistrala/provision/api"
"github.com/absmach/supermq"
"github.com/absmach/supermq/channels"
"github.com/absmach/supermq/clients"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/errors"
mgsdk "github.com/absmach/supermq/pkg/sdk"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"golang.org/x/sync/errgroup"
)
@@ -48,13 +48,13 @@ func main() {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.Server.LogLevel)
logger, err := smqlog.New(os.Stdout, cfg.Server.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
defer smqlog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
@@ -75,7 +75,7 @@ func main() {
SDKCfg := mgsdk.Config{
UsersURL: cfg.Server.UsersURL,
ThingsURL: cfg.Server.ThingsURL,
ClientsURL: cfg.Server.ClientsURL,
BootstrapURL: cfg.Server.MgBSURL,
CertsURL: cfg.Server.MgCertsURL,
MsgContentType: contentType,
@@ -84,13 +84,13 @@ func main() {
SDK := mgsdk.NewSDK(SDKCfg)
svc := provision.New(cfg, SDK, logger)
svc = api.NewLoggingMiddleware(svc, logger)
svc = httpapi.NewLoggingMiddleware(svc, logger)
httpServerConfig := server.Config{Host: "", Port: cfg.Server.HTTPPort, KeyFile: cfg.Server.ServerKey, CertFile: cfg.Server.ServerCert}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, logger, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
chc := chclient.New(svcName, supermq.Version, logger, cancel)
go chc.CallHome(ctx)
}
@@ -138,7 +138,7 @@ func loadConfig() (provision.Config, error) {
cfg.Bootstrap.Content = content
// This is default conf for provision if there is no config file
cfg.Channels = []mggroups.Group{
cfg.Channels = []channels.Channel{
{
Name: "control-channel",
Metadata: map[string]interface{}{"type": "control"},
@@ -147,9 +147,9 @@ func loadConfig() (provision.Config, error) {
Metadata: map[string]interface{}{"type": "data"},
},
}
cfg.Things = []things.Client{
cfg.Clients = []clients.Client{
{
Name: "thing",
Name: "client",
Metadata: map[string]interface{}{"external_id": "xxxxxx"},
},
}
+40 -43
View File
@@ -14,51 +14,49 @@ import (
"time"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/consumers"
redisclient "github.com/absmach/magistrala/internal/clients/redis"
mglog "github.com/absmach/magistrala/logger"
authnsvc "github.com/absmach/magistrala/pkg/authn/authsvc"
mgauthz "github.com/absmach/magistrala/pkg/authz"
authzsvc "github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/re"
httpapi "github.com/absmach/magistrala/re/api"
repg "github.com/absmach/magistrala/re/postgres"
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
smqlog "github.com/absmach/supermq/logger"
authnsvc "github.com/absmach/supermq/pkg/authn/authsvc"
mgauthz "github.com/absmach/supermq/pkg/authz"
authzsvc "github.com/absmach/supermq/pkg/authz/authsvc"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
const (
svcName = "rules_engine"
envPrefixDB = "MG_RE_DB_"
envPrefixHTTP = "MG_RE_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
envPrefixDB = "SMQ_RE_DB_"
envPrefixHTTP = "SMQ_RE_HTTP_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
defDB = "r"
defSvcHTTPPort = "9008"
)
type config struct {
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ConfigPath string `env:"MG_RE_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
LogLevel string `env:"SMQ_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"SMQ_RE_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
CacheURL string `env:"SMQ_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"SMQ_RE_CACHE_KEY_DURATION" envDefault:"10m"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ConfigPath string `env:"SMQ_RE_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
}
func main() {
@@ -72,13 +70,13 @@ func main() {
}
var logger *slog.Logger
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
logger, err := smqlog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
defer smqlog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
@@ -115,7 +113,6 @@ func main() {
}
}()
tracer := tp.Tracer(svcName)
pubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
@@ -133,13 +130,13 @@ func main() {
pubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, pubSub)
// Setup new redis cache client
cacheclient, err := redisclient.Connect(cfg.CacheURL)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer cacheclient.Close()
// cacheclient, err := redisclient.Connect(cfg.CacheURL)
// if err != nil {
// logger.Error(err.Error())
// exitCode = 1
// return
// }
// defer cacheclient.Close()
grpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
@@ -156,7 +153,7 @@ func main() {
defer authnClient.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
authz, authzClient, err := authzsvc.NewAuthorization(ctx, grpcCfg)
authz, authzClient, err := authzsvc.NewAuthorization(ctx, grpcCfg, nil)
if err != nil {
logger.Error(err.Error())
exitCode = 1
@@ -165,7 +162,7 @@ func main() {
defer authzClient.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())
svc, err := newService(ctx, db, dbConfig, authz, cacheclient, cfg.CacheKeyDuration, cfg.ESURL, tracer, logger)
svc, err := newService(ctx, db, dbConfig, authz, cfg.ESURL, tracer, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
@@ -180,7 +177,7 @@ func main() {
httpSvc := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, authn, logger, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
chc := chclient.New(svcName, supermq.Version, logger, cancel)
go chc.CallHome(ctx)
}
@@ -198,7 +195,7 @@ func main() {
}
}
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger *slog.Logger) (re.Service, error) {
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, esURL string, tracer trace.Tracer, logger *slog.Logger) (re.Service, error) {
database := pgclient.NewDatabase(db, dbConfig, tracer)
repo := repg.NewRepository(database)
idp := uuid.New()
-291
View File
@@ -1,291 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains things main function to start the things service.
package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
"time"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
redisclient "github.com/absmach/magistrala/internal/clients/redis"
mggroups "github.com/absmach/magistrala/internal/groups"
gevents "github.com/absmach/magistrala/internal/groups/events"
gmiddleware "github.com/absmach/magistrala/internal/groups/middleware"
gpostgres "github.com/absmach/magistrala/internal/groups/postgres"
gtracing "github.com/absmach/magistrala/internal/groups/tracing"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
mgauthz "github.com/absmach/magistrala/pkg/authz"
authsvcAuthz "github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/groups"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/policies"
"github.com/absmach/magistrala/pkg/policies/spicedb"
"github.com/absmach/magistrala/pkg/postgres"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
grpcserver "github.com/absmach/magistrala/pkg/server/grpc"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/things"
grpcapi "github.com/absmach/magistrala/things/api/grpc"
httpapi "github.com/absmach/magistrala/things/api/http"
thcache "github.com/absmach/magistrala/things/cache"
thevents "github.com/absmach/magistrala/things/events"
tmiddleware "github.com/absmach/magistrala/things/middleware"
thingspg "github.com/absmach/magistrala/things/postgres"
ctracing "github.com/absmach/magistrala/things/tracing"
"github.com/authzed/authzed-go/v1"
"github.com/authzed/grpcutil"
"github.com/caarlos0/env/v11"
"github.com/go-chi/chi/v5"
"github.com/jmoiron/sqlx"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
)
const (
svcName = "things"
envPrefixDB = "MG_THINGS_DB_"
envPrefixHTTP = "MG_THINGS_HTTP_"
envPrefixGRPC = "MG_THINGS_AUTH_GRPC_"
envPrefixAuth = "MG_AUTH_GRPC_"
defDB = "things"
defSvcHTTPPort = "9000"
defSvcAuthGRPCPort = "7000"
streamID = "magistrala.things"
)
type config struct {
LogLevel string `env:"MG_THINGS_LOG_LEVEL" envDefault:"info"`
StandaloneID string `env:"MG_THINGS_STANDALONE_ID" envDefault:""`
StandaloneToken string `env:"MG_THINGS_STANDALONE_TOKEN" envDefault:""`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
CacheKeyDuration time.Duration `env:"MG_THINGS_CACHE_KEY_DURATION" envDefault:"10m"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_THINGS_INSTANCE_ID" envDefault:""`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
CacheURL string `env:"MG_THINGS_CACHE_URL" envDefault:"redis://localhost:6379/0"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"MG_SPICEDB_HOST" envDefault:"localhost"`
SpicedbPort string `env:"MG_SPICEDB_PORT" envDefault:"50051"`
SpicedbPreSharedKey string `env:"MG_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
// Create new things configuration
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
var logger *slog.Logger
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
// Create new database for things
dbConfig := pgclient.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
tm := thingspg.Migration()
gm := gpostgres.Migration()
tm.Migrations = append(tm.Migrations, gm.Migrations...)
db, err := pgclient.Setup(dbConfig, *tm)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
// Setup new redis cache client
cacheclient, err := redisclient.Connect(cfg.CacheURL)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer cacheclient.Close()
policyEvaluator, policyService, err := newSpiceDBPolicyServiceEvaluator(cfg, logger)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
logger.Info("Policy evaluator and Policy manager are successfully connected to SpiceDB gRPC server")
grpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}
authn, authnClient, err := authsvcAuthn.NewAuthentication(ctx, grpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnClient.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
authz, authzClient, err := authsvcAuthz.NewAuthorization(ctx, grpcCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authzClient.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())
csvc, gsvc, err := newService(ctx, db, dbConfig, authz, policyEvaluator, policyService, cacheclient, cfg.CacheKeyDuration, cfg.ESURL, tracer, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
return
}
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
mux := chi.NewRouter()
httpSvc := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(csvc, gsvc, authn, mux, logger, cfg.InstanceID), logger)
grpcServerConfig := server.Config{Port: defSvcAuthGRPCPort}
if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
exitCode = 1
return
}
registerThingsServer := func(srv *grpc.Server) {
reflection.Register(srv)
magistrala.RegisterThingsServiceServer(srv, grpcapi.NewServer(csvc))
}
gs := grpcserver.NewServer(ctx, cancel, svcName, grpcServerConfig, registerThingsServer, logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
// Start all servers
g.Go(func() error {
return httpSvc.Start()
})
g.Go(func() error {
return gs.Start()
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, httpSvc)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
}
}
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, pe policies.Evaluator, ps policies.Service, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger *slog.Logger) (things.Service, groups.Service, error) {
database := postgres.NewDatabase(db, dbConfig, tracer)
cRepo := thingspg.NewRepository(database)
gRepo := gpostgres.New(database)
idp := uuid.New()
thingCache := thcache.NewCache(cacheClient, keyDuration)
csvc := things.NewService(pe, ps, cRepo, thingCache, idp)
gsvc := mggroups.NewService(gRepo, idp, ps)
csvc, err := thevents.NewEventStoreMiddleware(ctx, csvc, esURL)
if err != nil {
return nil, nil, err
}
gsvc, err = gevents.NewEventStoreMiddleware(ctx, gsvc, esURL, streamID)
if err != nil {
return nil, nil, err
}
csvc = tmiddleware.AuthorizationMiddleware(csvc, authz)
gsvc = gmiddleware.AuthorizationMiddleware(gsvc, authz)
csvc = ctracing.New(csvc, tracer)
csvc = tmiddleware.LoggingMiddleware(csvc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
csvc = tmiddleware.MetricsMiddleware(csvc, counter, latency)
gsvc = gtracing.New(gsvc, tracer)
gsvc = gmiddleware.LoggingMiddleware(gsvc, logger)
counter, latency = prometheus.MakeMetrics(fmt.Sprintf("%s_groups", svcName), "api")
gsvc = gmiddleware.MetricsMiddleware(gsvc, counter, latency)
return csvc, gsvc, err
}
func newSpiceDBPolicyServiceEvaluator(cfg config, logger *slog.Logger) (policies.Evaluator, policies.Service, error) {
client, err := authzed.NewClientWithExperimentalAPIs(
fmt.Sprintf("%s:%s", cfg.SpicedbHost, cfg.SpicedbPort),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpcutil.WithInsecureBearerToken(cfg.SpicedbPreSharedKey),
)
if err != nil {
return nil, nil, err
}
pe := spicedb.NewPolicyEvaluator(client, logger)
ps := spicedb.NewPolicyService(client, logger)
return pe, ps, nil
}
+61 -54
View File
@@ -12,38 +12,38 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
"github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/readers"
"github.com/absmach/magistrala/readers/api"
"github.com/absmach/magistrala/readers/timescale"
"github.com/absmach/supermq"
smqlog "github.com/absmach/supermq/logger"
"github.com/absmach/supermq/pkg/authn/authsvc"
"github.com/absmach/supermq/pkg/grpcclient"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/absmach/supermq/readers"
httpapi "github.com/absmach/supermq/readers/api"
"github.com/absmach/supermq/readers/timescale"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
)
const (
svcName = "timescaledb-reader"
envPrefixDB = "MG_TIMESCALE_"
envPrefixHTTP = "MG_TIMESCALE_READER_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
envPrefixThings = "MG_THINGS_AUTH_GRPC_"
defDB = "messages"
defSvcHTTPPort = "9011"
svcName = "timescaledb-reader"
envPrefixDB = "SMQ_TIMESCALE_"
envPrefixHTTP = "SMQ_TIMESCALE_READER_HTTP_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
envPrefixClients = "SMQ_CLIENTS_AUTH_GRPC_"
envPrefixChannels = "SMQ_CHANNELS_GRPC_"
defDB = "messages"
defSvcHTTPPort = "9011"
)
type config struct {
LogLevel string `env:"MG_TIMESCALE_READER_LOG_LEVEL" envDefault:"info"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_TIMESCALE_READER_INSTANCE_ID" envDefault:""`
LogLevel string `env:"SMQ_TIMESCALE_READER_LOG_LEVEL" envDefault:"info"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_TIMESCALE_READER_INSTANCE_ID" envDefault:""`
}
func main() {
@@ -55,13 +55,13 @@ func main() {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
logger, err := smqlog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
defer smqlog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
@@ -85,47 +85,54 @@ func main() {
repo := newService(db, logger)
clientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
clientsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientsClientCfg, env.Options{Prefix: envPrefixClients}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
authz, authzHandler, err := authsvc.NewAuthorization(ctx, clientCfg)
clientsClient, clientsHandler, err := grpcclient.SetupClientsClient(ctx, clientsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authzHandler.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authzHandler.Secure())
defer clientsHandler.Close()
authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, clientCfg)
logger.Info("ClientsService gRPC client successfully connected to clients gRPC server " + clientsHandler.Secure())
channelsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&channelsClientCfg, env.Options{Prefix: envPrefixChannels}); err != nil {
logger.Error(fmt.Sprintf("failed to load channels gRPC client configuration : %s", err))
exitCode = 1
return
}
channelsClient, channelsHandler, err := grpcclient.SetupChannelsClient(ctx, channelsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer channelsHandler.Close()
logger.Info("Channels service gRPC client successfully connected to channels gRPC server " + channelsHandler.Secure())
authnCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&authnCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}
authn, authnHandler, err := authsvc.NewAuthentication(ctx, authnCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnHandler.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnHandler.Secure())
thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClient, thingsHandler, err := grpcclient.SetupThingsClient(ctx, thingsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsHandler.Close()
logger.Info("Things service gRPC client successfully connected to things gRPC server " + thingsHandler.Secure())
logger.Info("authn successfully connected to auth gRPC server " + authnHandler.Secure())
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
@@ -133,10 +140,10 @@ func main() {
exitCode = 1
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, authn, authz, thingsClient, svcName, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(repo, authn, clientsClient, channelsClient, svcName, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
chc := chclient.New(svcName, supermq.Version, logger, cancel)
go chc.CallHome(ctx)
}
@@ -155,9 +162,9 @@ func main() {
func newService(db *sqlx.DB, logger *slog.Logger) readers.MessageRepository {
svc := timescale.New(db)
svc = api.LoggingMiddleware(svc, logger)
svc = httpapi.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("timescale", "message_reader")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = httpapi.MetricsMiddleware(svc, counter, latency)
return svc
}
+29 -29
View File
@@ -13,20 +13,20 @@ import (
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/consumers"
consumertracing "github.com/absmach/magistrala/consumers/tracing"
"github.com/absmach/magistrala/consumers/writers/api"
"github.com/absmach/magistrala/consumers/writers/timescale"
mglog "github.com/absmach/magistrala/logger"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/supermq"
"github.com/absmach/supermq/consumers"
consumertracing "github.com/absmach/supermq/consumers/tracing"
httpapi "github.com/absmach/supermq/consumers/writers/api"
"github.com/absmach/supermq/consumers/writers/timescale"
smqlog "github.com/absmach/supermq/logger"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/messaging/brokers"
brokerstracing "github.com/absmach/supermq/pkg/messaging/brokers/tracing"
pgclient "github.com/absmach/supermq/pkg/postgres"
"github.com/absmach/supermq/pkg/prometheus"
"github.com/absmach/supermq/pkg/server"
httpserver "github.com/absmach/supermq/pkg/server/http"
"github.com/absmach/supermq/pkg/uuid"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"golang.org/x/sync/errgroup"
@@ -34,20 +34,20 @@ import (
const (
svcName = "timescaledb-writer"
envPrefixDB = "MG_TIMESCALE_"
envPrefixHTTP = "MG_TIMESCALE_WRITER_HTTP_"
envPrefixDB = "SMQ_TIMESCALE_"
envPrefixHTTP = "SMQ_TIMESCALE_WRITER_HTTP_"
defDB = "messages"
defSvcHTTPPort = "9012"
)
type config struct {
LogLevel string `env:"MG_TIMESCALE_WRITER_LOG_LEVEL" envDefault:"info"`
ConfigPath string `env:"MG_TIMESCALE_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_TIMESCALE_WRITER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
LogLevel string `env:"SMQ_TIMESCALE_WRITER_LOG_LEVEL" envDefault:"info"`
ConfigPath string `env:"SMQ_TIMESCALE_WRITER_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"SMQ_TIMESCALE_WRITER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
@@ -59,13 +59,13 @@ func main() {
log.Fatalf("failed to load %s service configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
logger, err := smqlog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
defer smqlog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
@@ -127,10 +127,10 @@ func main() {
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svcName, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
chc := chclient.New(svcName, supermq.Version, logger, cancel)
go chc.CallHome(ctx)
}
@@ -149,8 +149,8 @@ func main() {
func newService(db *sqlx.DB, logger *slog.Logger) consumers.BlockingConsumer {
svc := timescale.New(db)
svc = api.LoggingMiddleware(svc, logger)
svc = httpapi.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("timescale", "message_writer")
svc = api.MetricsMiddleware(svc, counter, latency)
svc = httpapi.MetricsMiddleware(svc, counter, latency)
return svc
}
-387
View File
@@ -1,387 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains users main function to start the users service.
package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
"regexp"
"time"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
"github.com/absmach/magistrala/internal/email"
mggroups "github.com/absmach/magistrala/internal/groups"
gevents "github.com/absmach/magistrala/internal/groups/events"
gmiddleware "github.com/absmach/magistrala/internal/groups/middleware"
gpostgres "github.com/absmach/magistrala/internal/groups/postgres"
gtracing "github.com/absmach/magistrala/internal/groups/tracing"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
mgauthz "github.com/absmach/magistrala/pkg/authz"
authsvcAuthz "github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/groups"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/oauth2"
googleoauth "github.com/absmach/magistrala/pkg/oauth2/google"
"github.com/absmach/magistrala/pkg/policies"
"github.com/absmach/magistrala/pkg/policies/spicedb"
"github.com/absmach/magistrala/pkg/postgres"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/users"
capi "github.com/absmach/magistrala/users/api"
"github.com/absmach/magistrala/users/emailer"
uevents "github.com/absmach/magistrala/users/events"
"github.com/absmach/magistrala/users/hasher"
cmiddleware "github.com/absmach/magistrala/users/middleware"
clientspg "github.com/absmach/magistrala/users/postgres"
ctracing "github.com/absmach/magistrala/users/tracing"
"github.com/authzed/authzed-go/v1"
"github.com/authzed/grpcutil"
"github.com/caarlos0/env/v11"
"github.com/go-chi/chi/v5"
"github.com/jmoiron/sqlx"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
svcName = "users"
envPrefixDB = "MG_USERS_DB_"
envPrefixHTTP = "MG_USERS_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
envPrefixGoogle = "MG_GOOGLE_"
defDB = "users"
defSvcHTTPPort = "9002"
streamID = "magistrala.users"
)
type config struct {
LogLevel string `env:"MG_USERS_LOG_LEVEL" envDefault:"info"`
AdminEmail string `env:"MG_USERS_ADMIN_EMAIL" envDefault:"admin@example.com"`
AdminPassword string `env:"MG_USERS_ADMIN_PASSWORD" envDefault:"12345678"`
AdminUsername string `env:"MG_USERS_ADMIN_USERNAME" envDefault:"admin"`
AdminFirstName string `env:"MG_USERS_ADMIN_FIRST_NAME" envDefault:"super"`
AdminLastName string `env:"MG_USERS_ADMIN_LAST_NAME" envDefault:"admin"`
PassRegexText string `env:"MG_USERS_PASS_REGEX" envDefault:"^.{8,}$"`
ResetURL string `env:"MG_TOKEN_RESET_ENDPOINT" envDefault:"/reset-request"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_USERS_INSTANCE_ID" envDefault:""`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SelfRegister bool `env:"MG_USERS_ALLOW_SELF_REGISTER" envDefault:"false"`
OAuthUIRedirectURL string `env:"MG_OAUTH_UI_REDIRECT_URL" envDefault:"http://localhost:9095/domains"`
OAuthUIErrorURL string `env:"MG_OAUTH_UI_ERROR_URL" envDefault:"http://localhost:9095/error"`
DeleteInterval time.Duration `env:"MG_USERS_DELETE_INTERVAL" envDefault:"24h"`
DeleteAfter time.Duration `env:"MG_USERS_DELETE_AFTER" envDefault:"720h"`
SpicedbHost string `env:"MG_SPICEDB_HOST" envDefault:"localhost"`
SpicedbPort string `env:"MG_SPICEDB_PORT" envDefault:"50051"`
SpicedbPreSharedKey string `env:"MG_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
PassRegex *regexp.Regexp
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err.Error())
}
passRegex, err := regexp.Compile(cfg.PassRegexText)
if err != nil {
log.Fatalf("invalid password validation rules %s\n", cfg.PassRegexText)
}
cfg.PassRegex = passRegex
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
ec := email.Config{}
if err := env.Parse(&ec); err != nil {
logger.Error(fmt.Sprintf("failed to load email configuration : %s", err.Error()))
exitCode = 1
return
}
dbConfig := pgclient.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
cm := clientspg.Migration()
gm := gpostgres.Migration()
cm.Migrations = append(cm.Migrations, gm.Migrations...)
db, err := pgclient.Setup(dbConfig, *cm)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
clientConfig := grpcclient.Config{}
if err := env.ParseWithOptions(&clientConfig, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
tokenClient, tokenHandler, err := grpcclient.SetupTokenClient(ctx, clientConfig)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer tokenHandler.Close()
logger.Info("Token service client successfully connected to auth gRPC server " + tokenHandler.Secure())
authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, clientConfig)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnHandler.Close()
logger.Info("AuthN successfully connected to auth gRPC server " + authnHandler.Secure())
authz, authzHandler, err := authsvcAuthz.NewAuthorization(ctx, clientConfig)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authzHandler.Close()
logger.Info("AuthZ successfully connected to auth gRPC server " + authzHandler.Secure())
domainsClient, domainsHandler, err := grpcclient.SetupDomainsClient(ctx, clientConfig)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer domainsHandler.Close()
logger.Info("DomainsService gRPC client successfully connected to auth gRPC server " + domainsHandler.Secure())
policyService, err := newPolicyService(cfg, logger)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
logger.Info("Policy client successfully connected to spicedb gRPC server")
csvc, gsvc, err := newService(ctx, authz, tokenClient, policyService, domainsClient, db, dbConfig, tracer, cfg, ec, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to setup service: %s", err))
exitCode = 1
return
}
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
oauthConfig := oauth2.Config{}
if err := env.ParseWithOptions(&oauthConfig, env.Options{Prefix: envPrefixGoogle}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s Google configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
oauthProvider := googleoauth.NewProvider(oauthConfig, cfg.OAuthUIRedirectURL, cfg.OAuthUIErrorURL)
mux := chi.NewRouter()
httpSrv := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, capi.MakeHandler(csvc, authn, tokenClient, cfg.SelfRegister, gsvc, mux, logger, cfg.InstanceID, cfg.PassRegex, oauthProvider), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
return httpSrv.Start()
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, httpSrv)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("users service terminated: %s", err))
}
}
func newService(ctx context.Context, authz mgauthz.Authorization, token magistrala.TokenServiceClient, policyService policies.Service, domainsClient magistrala.DomainsServiceClient, db *sqlx.DB, dbConfig pgclient.Config, tracer trace.Tracer, c config, ec email.Config, logger *slog.Logger) (users.Service, groups.Service, error) {
database := postgres.NewDatabase(db, dbConfig, tracer)
cRepo := clientspg.NewRepository(database)
gRepo := gpostgres.New(database)
idp := uuid.New()
hsr := hasher.New()
emailerClient, err := emailer.New(c.ResetURL, &ec)
if err != nil {
logger.Error(fmt.Sprintf("failed to configure e-mailing util: %s", err.Error()))
}
csvc := users.NewService(token, cRepo, policyService, emailerClient, hsr, idp)
gsvc := mggroups.NewService(gRepo, idp, policyService)
csvc, err = uevents.NewEventStoreMiddleware(ctx, csvc, c.ESURL)
if err != nil {
return nil, nil, err
}
gsvc, err = gevents.NewEventStoreMiddleware(ctx, gsvc, c.ESURL, streamID)
if err != nil {
return nil, nil, err
}
csvc = cmiddleware.AuthorizationMiddleware(csvc, authz, c.SelfRegister)
gsvc = gmiddleware.AuthorizationMiddleware(gsvc, authz)
csvc = ctracing.New(csvc, tracer)
csvc = cmiddleware.LoggingMiddleware(csvc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
csvc = cmiddleware.MetricsMiddleware(csvc, counter, latency)
gsvc = gtracing.New(gsvc, tracer)
gsvc = gmiddleware.LoggingMiddleware(gsvc, logger)
counter, latency = prometheus.MakeMetrics("groups", "api")
gsvc = gmiddleware.MetricsMiddleware(gsvc, counter, latency)
userID, err := createAdmin(ctx, c, cRepo, hsr, csvc)
if err != nil {
logger.Error(fmt.Sprintf("failed to create admin client: %s", err))
}
if err := createAdminPolicy(ctx, userID, authz, policyService); err != nil {
return nil, nil, err
}
users.NewDeleteHandler(ctx, cRepo, policyService, domainsClient, c.DeleteInterval, c.DeleteAfter, logger)
return csvc, gsvc, err
}
func createAdmin(ctx context.Context, c config, urepo users.Repository, hsr users.Hasher, svc users.Service) (string, error) {
id, err := uuid.New().ID()
if err != nil {
return "", err
}
hash, err := hsr.Hash(c.AdminPassword)
if err != nil {
return "", err
}
user := users.User{
ID: id,
Email: c.AdminEmail,
FirstName: c.AdminFirstName,
LastName: c.AdminLastName,
Credentials: users.Credentials{
Username: "admin",
Secret: hash,
},
Metadata: users.Metadata{
"role": "admin",
},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
Role: users.AdminRole,
Status: users.EnabledStatus,
}
if u, err := urepo.RetrieveByEmail(ctx, user.Email); err == nil {
return u.ID, nil
}
// Create an admin
if _, err = urepo.Save(ctx, user); err != nil {
return "", err
}
if _, err = svc.IssueToken(ctx, c.AdminUsername, c.AdminPassword); err != nil {
return "", err
}
return user.ID, nil
}
func createAdminPolicy(ctx context.Context, userID string, authz mgauthz.Authorization, policyService policies.Service) error {
if err := authz.Authorize(ctx, mgauthz.PolicyReq{
SubjectType: policies.UserType,
Subject: userID,
Permission: policies.AdministratorRelation,
Object: policies.MagistralaObject,
ObjectType: policies.PlatformType,
}); err != nil {
err := policyService.AddPolicy(ctx, policies.Policy{
SubjectType: policies.UserType,
Subject: userID,
Relation: policies.AdministratorRelation,
Object: policies.MagistralaObject,
ObjectType: policies.PlatformType,
})
if err != nil {
return err
}
}
return nil
}
func newPolicyService(cfg config, logger *slog.Logger) (policies.Service, error) {
client, err := authzed.NewClientWithExperimentalAPIs(
fmt.Sprintf("%s:%s", cfg.SpicedbHost, cfg.SpicedbPort),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpcutil.WithInsecureBearerToken(cfg.SpicedbPreSharedKey),
)
if err != nil {
return nil, err
}
policySvc := spicedb.NewPolicyService(client, logger)
return policySvc, nil
}
-193
View File
@@ -1,193 +0,0 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// Package main contains websocket-adapter main function to start the websocket-adapter service.
package main
import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
"github.com/absmach/magistrala/pkg/prometheus"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/ws"
"github.com/absmach/magistrala/ws/api"
"github.com/absmach/magistrala/ws/tracing"
"github.com/absmach/mgate/pkg/session"
"github.com/absmach/mgate/pkg/websockets"
"github.com/caarlos0/env/v11"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
const (
svcName = "ws-adapter"
envPrefixHTTP = "MG_WS_ADAPTER_HTTP_"
envPrefixThings = "MG_THINGS_AUTH_GRPC_"
defSvcHTTPPort = "8190"
targetWSPort = "8191"
targetWSHost = "localhost"
)
type config struct {
LogLevel string `env:"MG_WS_ADAPTER_LOG_LEVEL" envDefault:"info"`
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MG_WS_ADAPTER_INSTANCE_ID" envDefault:""`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}
var exitCode int
defer mglog.ExitWithError(&exitCode)
if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}
httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
targetServerConfig := server.Config{
Port: targetWSPort,
Host: targetWSHost,
}
thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
exitCode = 1
return
}
thingsClient, thingsHandler, err := grpcclient.SetupThingsClient(ctx, thingsClientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsHandler.Close()
logger.Info("Things service gRPC client successfully connected to things gRPC server " + thingsHandler.Secure())
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)
nps, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer nps.Close()
nps = brokerstracing.NewPubSub(targetServerConfig, tracer, nps)
svc := newService(thingsClient, nps, logger, tracer)
hs := httpserver.NewServer(ctx, cancel, svcName, targetServerConfig, api.MakeHandler(ctx, svc, logger, cfg.InstanceID), logger)
if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}
g.Go(func() error {
g.Go(func() error {
return hs.Start()
})
handler := ws.NewHandler(nps, logger, thingsClient)
return proxyWS(ctx, httpServerConfig, targetServerConfig, logger, handler)
})
g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, hs)
})
if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("WS adapter service terminated: %s", err))
}
}
func newService(thingsClient magistrala.ThingsServiceClient, nps messaging.PubSub, logger *slog.Logger, tracer trace.Tracer) ws.Service {
svc := ws.New(thingsClient, nps)
svc = tracing.New(tracer, svc)
svc = api.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics("ws_adapter", "api")
svc = api.MetricsMiddleware(svc, counter, latency)
return svc
}
func proxyWS(ctx context.Context, hostConfig, targetConfig server.Config, logger *slog.Logger, handler session.Handler) error {
target := fmt.Sprintf("ws://%s:%s", targetConfig.Host, targetConfig.Port)
address := fmt.Sprintf("%s:%s", hostConfig.Host, hostConfig.Port)
wp, err := websockets.NewProxy(address, target, logger, handler)
if err != nil {
return err
}
errCh := make(chan error)
go func() {
if hostConfig.CertFile != "" && hostConfig.KeyFile != "" {
logger.Info(fmt.Sprintf("ws-adapter service http server listening at %s:%s with TLS", hostConfig.Host, hostConfig.Port))
errCh <- wp.ListenTLS(hostConfig.CertFile, hostConfig.KeyFile)
} else {
logger.Info(fmt.Sprintf("ws-adapter service http server listening at %s:%s without TLS", hostConfig.Host, hostConfig.Port))
errCh <- wp.Listen()
}
}()
select {
case <-ctx.Done():
logger.Info(fmt.Sprintf("proxy MQTT WS shutdown at %s", target))
return nil
case err := <-errCh:
return err
}
}