NOISSUE - Improve graceful shutdown and code quality (#1821)

* remove single case switch
remove duplicate cases

Signed-off-by: SammyOina <sammyoina@gmail.com>

* remove single case switch \n format comments

Signed-off-by: SammyOina <sammyoina@gmail.com>

* graceful exit on main func

Signed-off-by: SammyOina <sammyoina@gmail.com>

* remove fatal from imported packages

Signed-off-by: SammyOina <sammyoina@gmail.com>

* reuse exit function

Signed-off-by: SammyOina <sammyoina@gmail.com>

* return nill for empty configs

Signed-off-by: SammyOina <sammyoina@gmail.com>

* return nil for config file not found

Signed-off-by: SammyOina <sammyoina@gmail.com>

---------

Signed-off-by: SammyOina <sammyoina@gmail.com>
Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
This commit is contained in:
Sammy Kerata Oina
2023-07-28 04:01:27 +03:00
committed by GitHub
parent 33eb8d8bd3
commit 7758f42f6b
50 changed files with 418 additions and 182 deletions
+5 -5
View File
@@ -5,7 +5,6 @@ package cli
import (
"fmt"
"log"
"os"
"github.com/mainflux/mainflux/pkg/errors"
@@ -33,21 +32,21 @@ func read(file string) (Config, error) {
return c, nil
}
func ParseConfig() {
func ParseConfig() error {
if ConfigPath == "" {
// No config file
return
return nil
}
if _, err := os.Stat(ConfigPath); os.IsNotExist(err) {
errConfigNotFound := errors.Wrap(errors.New("config file was not found"), err)
logError(errConfigNotFound)
return
return nil
}
config, err := read(ConfigPath)
if err != nil {
log.Fatal(err)
return err
}
if config.Offset != 0 {
@@ -65,4 +64,5 @@ func ParseConfig() {
if config.RawOutput {
RawOutput = config.RawOutput
}
return nil
}
+14 -4
View File
@@ -87,19 +87,25 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
// Create new redis client for bootstrap event store
esClient, err := redisClient.Setup(envPrefixES)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup %s bootstrap event store redis client : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to setup %s bootstrap event store redis client : %s", svcName, err))
exitCode = 1
return
}
defer esClient.Close()
// Create new auth grpc client api
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
@@ -121,7 +127,9 @@ func main() {
// Create an new HTTP server
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, bootstrap.NewConfigReader([]byte(cfg.EncKey)), logger, instanceID), logger)
@@ -142,7 +150,9 @@ func main() {
// Subscribe to things event store
thingsESClient, err := redisClient.Setup(envPrefixES)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsESClient.Close()
+11 -3
View File
@@ -70,13 +70,17 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
// Create new auth grpc client
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
@@ -84,7 +88,9 @@ func main() {
// Create new cassandra client
csdSession, err := cassandraClient.Setup(envPrefix)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer csdSession.Close()
@@ -95,7 +101,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)
+8 -2
View File
@@ -75,6 +75,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer csdSession.Close()
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
@@ -90,7 +92,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefix, AltPrefix: envPrefixHttp}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
// Create new cassandra-writer repo
@@ -100,7 +104,9 @@ func main() {
// Create new pub sub broker
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
+5 -1
View File
@@ -100,11 +100,15 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
+3 -1
View File
@@ -32,7 +32,9 @@ func main() {
var rootCmd = &cobra.Command{
Use: "mainflux-cli",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
cli.ParseConfig()
if err := cli.ParseConfig(); err != nil {
log.Fatal(err)
}
sdkConf.MsgContentType = sdk.ContentType(msgContentType)
s := sdk.NewSDK(sdkConf)
+11 -3
View File
@@ -72,6 +72,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
@@ -88,7 +90,9 @@ func main() {
nps, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
nps = pstracing.NewPubSub(tracer, nps)
defer nps.Close()
@@ -104,13 +108,17 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(instanceID), logger)
coapServerConfig := server.Config{Port: defSvcCoapPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixCoap, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s CoAP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s CoAP server configuration : %s", svcName, err))
exitCode = 1
return
}
cs := coapserver.New(ctx, cancel, svcName, coapServerConfig, api.MakeCoAPHandler(svc, logger), logger)
+8 -2
View File
@@ -73,6 +73,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
@@ -89,7 +91,9 @@ func main() {
pub, err := brokers.NewPublisher(cfg.BrokerURL)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pub = pstracing.New(tracer, pub)
defer pub.Close()
@@ -98,7 +102,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, instanceID), logger)
+14 -4
View File
@@ -69,19 +69,25 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
influxDBConfig := influxDBClient.Config{}
if err := env.Parse(&influxDBConfig, env.Options{Prefix: envPrefixInfluxdb}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
logger.Error(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
exitCode = 1
return
}
influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port)
@@ -92,7 +98,9 @@ func main() {
client, err := influxDBClient.Connect(influxDBConfig, ctx)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
logger.Error(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
exitCode = 1
return
}
defer client.Close()
@@ -100,7 +108,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)
+17 -5
View File
@@ -71,6 +71,8 @@ func main() {
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
@@ -80,14 +82,18 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
influxDBConfig := influxDBClient.Config{}
if err := env.Parse(&influxDBConfig, env.Options{Prefix: envPrefixInfluxdb}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
logger.Error(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
exitCode = 1
return
}
influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port)
@@ -98,13 +104,17 @@ func main() {
client, err := influxDBClient.Connect(influxDBConfig, ctx)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
logger.Error(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
exitCode = 1
return
}
defer client.Close()
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
repo := influxdb.NewAsync(client, repocfg)
@@ -120,7 +130,9 @@ func main() {
}(logger)
if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to start InfluxDB writer: %s", err))
logger.Error(fmt.Sprintf("failed to start InfluxDB writer: %s", err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)
+27 -11
View File
@@ -87,6 +87,8 @@ func main() {
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup route map redis client : %s", err))
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer rmConn.Close()
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
@@ -102,7 +104,9 @@ func main() {
pub, err := brokers.NewPublisher(cfg.BrokerURL)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pub = tracing.New(tracer, pub)
defer pub.Close()
@@ -111,18 +115,29 @@ func main() {
esConn, err := redisClient.Setup(envPrefixThingsES)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup things event store redis client : %s", err))
logger.Error(fmt.Sprintf("failed to setup things event store redis client : %s", err))
exitCode = 1
return
}
defer esConn.Close()
mqttConn := connectToMQTTBroker(cfg.LoraMsgURL, cfg.LoraMsgUser, cfg.LoraMsgPass, cfg.LoraMsgTimeout, logger)
mqttConn, err := connectToMQTTBroker(cfg.LoraMsgURL, cfg.LoraMsgUser, cfg.LoraMsgPass, cfg.LoraMsgTimeout, logger)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
go subscribeToLoRaBroker(svc, mqttConn, cfg.LoraMsgTimeout, cfg.LoraMsgTopic, logger)
g.Go(func() error {
return subscribeToLoRaBroker(svc, mqttConn, cfg.LoraMsgTimeout, cfg.LoraMsgTopic, logger)
})
go subscribeToThingsES(svc, esConn, cfg.ESConsumerName, logger)
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(instanceID), logger)
@@ -144,7 +159,7 @@ func main() {
}
}
func connectToMQTTBroker(url, user, password string, timeout time.Duration, logger mflog.Logger) mqttPaho.Client {
func connectToMQTTBroker(url, user, password string, timeout time.Duration, logger mflog.Logger) (mqttPaho.Client, error) {
opts := mqttPaho.NewClientOptions()
opts.AddBroker(url)
opts.SetUsername(user)
@@ -153,24 +168,25 @@ func connectToMQTTBroker(url, user, password string, timeout time.Duration, logg
logger.Info("Connected to Lora MQTT broker")
})
opts.SetConnectionLostHandler(func(c mqttPaho.Client, err error) {
logger.Fatal(fmt.Sprintf("MQTT connection lost: %s", err))
logger.Error(fmt.Sprintf("MQTT connection lost: %s", err))
})
client := mqttPaho.NewClient(opts)
if token := client.Connect(); token.WaitTimeout(timeout) && token.Error() != nil {
logger.Fatal(fmt.Sprintf("failed to connect to Lora MQTT broker: %s", token.Error()))
return nil, fmt.Errorf("failed to connect to Lora MQTT broker: %s", token.Error())
}
return client
return client, nil
}
func subscribeToLoRaBroker(svc lora.Service, mc mqttPaho.Client, timeout time.Duration, topic string, logger mflog.Logger) {
func subscribeToLoRaBroker(svc lora.Service, mc mqttPaho.Client, timeout time.Duration, topic string, logger mflog.Logger) error {
mqtt := mqtt.NewBroker(svc, mc, timeout, logger)
logger.Info("Subscribed to Lora MQTT broker")
if err := mqtt.Subscribe(topic); err != nil {
logger.Fatal(fmt.Sprintf("failed to subscribe to Lora MQTT broker: %s", err))
return fmt.Errorf("failed to subscribe to Lora MQTT broker: %s", err)
}
return nil
}
func subscribeToThingsES(svc lora.Service, client *r.Client, consumer string, logger mflog.Logger) {
+7 -1
View File
@@ -75,19 +75,25 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)
+14 -4
View File
@@ -73,6 +73,8 @@ func main() {
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
@@ -82,26 +84,34 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
db, err := mongoClient.Setup(envPrefixDB)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup mongo database : %s", err))
logger.Error(fmt.Sprintf("failed to setup mongo database : %s", err))
exitCode = 1
return
}
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
repo := newService(db, logger)
repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig)
if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to start MongoDB writer: %s", err))
logger.Error(fmt.Sprintf("failed to start MongoDB writer: %s", err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)
+23 -7
View File
@@ -98,6 +98,8 @@ func main() {
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
@@ -107,33 +109,43 @@ func main() {
nps, err := brokers.NewPubSub(cfg.BrokerURL, "mqtt", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
nps = tracing.NewPubSub(tracer, nps)
defer nps.Close()
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MqttTargetHost, cfg.MqttTargetPort), cfg.MqttForwarderTimeout)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to create MQTT publisher: %s", err))
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
exitCode = 1
return
}
mpub = tracing.New(tracer, mpub)
fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger)
fwd = mqtttracing.New(tracer, fwd, brokers.SubjectAllChannels)
if err := fwd.Forward(ctx, svcName, nps, mpub); err != nil {
logger.Fatal(fmt.Sprintf("failed to forward message broker messages: %s", err))
logger.Error(fmt.Sprintf("failed to forward message broker messages: %s", err))
exitCode = 1
return
}
np, err := brokers.NewPublisher(cfg.BrokerURL)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
np = tracing.New(tracer, np)
defer np.Close()
ec, err := redisClient.Setup(envPrefixES)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup %s event store redis client : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to setup %s event store redis client : %s", svcName, err))
exitCode = 1
return
}
defer ec.Close()
@@ -141,13 +153,17 @@ func main() {
ac, err := redisClient.Setup(envPrefixAuthCache)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup %s event store redis client : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to setup %s event store redis client : %s", svcName, err))
exitCode = 1
return
}
defer ac.Close()
tc, tcHandler, err := thingsClient.Setup(envPrefix)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
+8 -2
View File
@@ -94,6 +94,8 @@ func main() {
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup %s bootstrap event store redis client : %s", svcName, err))
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer esConn.Close()
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
@@ -109,7 +111,9 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
@@ -124,7 +128,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, httpCancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, instanceID), logger)
+14 -4
View File
@@ -69,23 +69,31 @@ func main() {
}
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
dbConfig := pgClient.Config{Name: defDB}
if err := dbConfig.LoadEnv(envPrefix); err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
db, err := pgClient.Connect(dbConfig)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup postgres database : %s", err))
logger.Error(fmt.Sprintf("failed to setup postgres database : %s", err))
exitCode = 1
return
}
defer db.Close()
@@ -93,7 +101,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)
+14 -4
View File
@@ -73,6 +73,8 @@ func main() {
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
@@ -82,7 +84,9 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
@@ -90,20 +94,26 @@ func main() {
dbConfig := pgClient.Config{Name: defDB}
db, err := pgClient.SetupWithConfig(envPrefix, *writerPg.Migration(), dbConfig)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
repo := newService(db, logger)
repo = consumerTracing.NewBlocking(tracer, repo, httpServerConfig)
if err = consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err))
logger.Error(fmt.Sprintf("failed to create Postgres writer: %s", err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)
+17 -5
View File
@@ -82,11 +82,15 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
smppConfig := mfsmpp.Config{}
if err := env.Parse(&smppConfig); err != nil {
logger.Fatal(fmt.Sprintf("failed to load SMPP configuration from environment : %s", err))
logger.Error(fmt.Sprintf("failed to load SMPP configuration from environment : %s", err))
exitCode = 1
return
}
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
@@ -102,26 +106,34 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = pstracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
svc := newService(db, tracer, auth, cfg, smppConfig, logger)
if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err))
logger.Error(fmt.Sprintf("failed to create Postgres writer: %s", err))
exitCode = 1
return
}
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, instanceID), logger)
+29 -10
View File
@@ -83,16 +83,22 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
ec := email.Config{}
if err := env.Parse(&ec); err != nil {
logger.Fatal(fmt.Sprintf("failed to load email configuration : %s", err))
logger.Error(fmt.Sprintf("failed to load email configuration : %s", err))
exitCode = 1
return
}
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err))
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
@@ -103,27 +109,40 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = pstracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
svc := newService(db, tracer, auth, cfg, ec, logger)
svc, err := newService(db, tracer, auth, cfg, ec, logger)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err))
logger.Error(fmt.Sprintf("failed to create Postgres writer: %s", err))
exitCode = 1
return
}
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, instanceID), logger)
@@ -146,14 +165,14 @@ func main() {
}
func newService(db *sqlx.DB, tracer trace.Tracer, auth policies.AuthServiceClient, c config, ec email.Config, logger mflog.Logger) notifiers.Service {
func newService(db *sqlx.DB, tracer trace.Tracer, auth policies.AuthServiceClient, c config, ec email.Config, logger mflog.Logger) (notifiers.Service, error) {
database := notifierPg.NewDatabase(db, tracer)
repo := tracing.New(tracer, notifierPg.New(database))
idp := ulid.New()
agent, err := email.New(&ec)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to create email agent: %s", err))
return nil, fmt.Errorf("failed to create email agent: %s", err)
}
notifier := smtp.New(agent)
@@ -162,5 +181,5 @@ func newService(db *sqlx.DB, tracer trace.Tracer, auth policies.AuthServiceClien
counter, latency := internal.MakeMetrics("notifier", "smtp")
svc = api.MetricsMiddleware(svc, counter, latency)
return svc
return svc, nil
}
+17 -5
View File
@@ -105,6 +105,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
@@ -121,14 +123,18 @@ func main() {
// Setup new redis cache client
cacheClient, err := redisClient.Setup(envPrefixCache)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer cacheClient.Close()
// Setup new redis event store client
esClient, err := redisClient.Setup(envPrefixES)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer esClient.Close()
@@ -140,7 +146,9 @@ func main() {
default:
authServiceClient, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
auth = authServiceClient
@@ -151,7 +159,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
exitCode = 1
return
}
mux := bone.New()
hsp := httpserver.New(ctx, cancel, "things-policies", httpServerConfig, httpapi.MakeHandler(csvc, psvc, mux, logger), logger)
@@ -164,7 +174,9 @@ func main() {
}
grpcServerConfig := server.Config{Port: defSvcAuthGrpcPort}
if err := env.Parse(&grpcServerConfig, env.Options{Prefix: envPrefixAuthGrpc, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
exitCode = 1
return
}
gs := grpcserver.New(ctx, cancel, svcName, grpcServerConfig, registerThingsServiceServer, logger)
+11 -3
View File
@@ -72,27 +72,35 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
repo := newService(db, logger)
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
tc, tcHandler, err := thingsClient.Setup(envPrefix)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)
+11 -3
View File
@@ -75,6 +75,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
@@ -90,7 +92,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
repo := newService(db, logger)
@@ -98,13 +102,17 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
if err = consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to create Timescale writer: %s", err))
logger.Error(fmt.Sprintf("failed to create Timescale writer: %s", err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)
+17 -5
View File
@@ -86,6 +86,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer cacheClient.Close()
// Setup new redis event store client
@@ -97,12 +99,16 @@ func main() {
db, err := mongoClient.Setup(envPrefix)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup postgres database : %s", err))
logger.Error(fmt.Sprintf("failed to setup postgres database : %s", err))
exitCode = 1
return
}
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err))
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
@@ -118,7 +124,9 @@ func main() {
default:
authServiceClient, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
auth = authServiceClient
@@ -127,7 +135,9 @@ func main() {
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, queue, logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = pstracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
@@ -136,7 +146,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, twapi.MakeHandler(svc, logger, instanceID), logger)
+11 -3
View File
@@ -114,11 +114,15 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err))
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
@@ -131,7 +135,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error()))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
mux := bone.New()
hsc := httpserver.New(ctx, cancel, svcName, httpServerConfig, capi.MakeHandler(csvc, mux, logger, instanceID), logger)
@@ -145,7 +151,9 @@ func main() {
}
grpcServerConfig := server.Config{Port: defSvcGrpcPort}
if err := env.Parse(&grpcServerConfig, env.Options{Prefix: envPrefixGrpc, AltPrefix: envPrefix}); err != nil {
log.Fatalf("failed to load %s gRPC server configuration : %s", svcName, err.Error())
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err.Error()))
exitCode = 1
return
}
gs := grpcserver.New(ctx, cancel, svcName, grpcServerConfig, registerAuthServiceServer, logger)
+11 -3
View File
@@ -73,12 +73,16 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer internal.Close(logger, tcHandler)
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err))
logger.Error(fmt.Sprintf("failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
@@ -89,7 +93,9 @@ func main() {
nps, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("Failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("Failed to connect to message broker: %s", err))
exitCode = 1
return
}
nps = pstracing.NewPubSub(tracer, nps)
defer nps.Close()
@@ -98,7 +104,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, instanceID), logger)
+1 -4
View File
@@ -31,10 +31,7 @@ func (svc authServiceMock) Identify(ctx context.Context, in *policies.Token, opt
func (svc authServiceMock) Issue(ctx context.Context, in *policies.IssueReq, opts ...grpc.CallOption) (*policies.Token, error) {
if id, ok := svc.users[in.GetEmail()]; ok {
switch in.Type {
default:
return &policies.Token{Value: id}, nil
}
return &policies.Token{Value: id}, nil
}
return nil, errors.ErrAuthentication
}
+1 -2
View File
@@ -83,8 +83,7 @@ func (pr postgresRepo) saveSenml(ctx context.Context, messages interface{}) (err
if _, err := tx.NamedExec(q, m); err != nil {
pgErr, ok := err.(*pgconn.PgError)
if ok {
switch pgErr.Code {
case pgerrcode.InvalidTextRepresentation:
if pgErr.Code == pgerrcode.InvalidTextRepresentation {
return errors.Wrap(errSaveMessage, errInvalidMessage)
}
}
+1 -2
View File
@@ -78,8 +78,7 @@ func (tr timescaleRepo) saveSenml(ctx context.Context, messages interface{}) (er
if _, err := tx.NamedExec(q, m); err != nil {
pgErr, ok := err.(*pgconn.PgError)
if ok {
switch pgErr.Code {
case pgerrcode.InvalidTextRepresentation:
if pgErr.Code == pgerrcode.InvalidTextRepresentation {
return errors.Wrap(errSaveMessage, errInvalidMessage)
}
}
+12 -12
View File
@@ -71,23 +71,23 @@ func New(c *Config) (*Agent, error) {
}
// Send sends e-mail.
func (a *Agent) Send(To []string, From, Subject, Header, User, Content, Footer string) error {
func (a *Agent) Send(to []string, from, subject, header, user, content, footer string) error {
if a.tmpl == nil {
return errMissingEmailTemplate
}
buff := new(bytes.Buffer)
e := email{
To: To,
From: From,
Subject: Subject,
Header: Header,
User: User,
Content: Content,
Host: strings.Split(Content, "?")[0],
Footer: Footer,
To: to,
From: from,
Subject: subject,
Header: header,
User: user,
Content: content,
Host: strings.Split(content, "?")[0],
Footer: footer,
}
if From == "" {
if from == "" {
from := mail.Address{Name: a.conf.FromName, Address: a.conf.FromAddress}
e.From = from.String()
}
@@ -98,8 +98,8 @@ func (a *Agent) Send(To []string, From, Subject, Header, User, Content, Footer s
m := gomail.NewMessage()
m.SetHeader("From", e.From)
m.SetHeader("To", To...)
m.SetHeader("Subject", Subject)
m.SetHeader("To", to...)
m.SetHeader("Subject", subject)
m.SetBody("text/plain", buff.String())
if err := a.dial.DialAndSend(m); err != nil {
+8
View File
@@ -0,0 +1,8 @@
package logger
import "os"
// ExitWithError closes the current process with error code.
func ExitWithError(code *int) {
os.Exit(*code)
}
+1 -1
View File
@@ -266,7 +266,7 @@ func parseSubtopic(subtopic string) (string, error) {
if err != nil {
return "", ErrMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}
+1 -1
View File
@@ -41,7 +41,7 @@ var (
topics = []string{topic}
invalidTopics = []string{invalidTopic}
invalidChanIDTopics = []string{fmt.Sprintf(topicMsg, invalidTopic)}
//Test log messages for cases the handler does not provide a return value.
// Test log messages for cases the handler does not provide a return value.
logBuffer = bytes.Buffer{}
sessionClient = session.Session{
ID: clientID,
+2 -2
View File
@@ -88,13 +88,13 @@ func TestPublisher(t *testing.T) {
}
func TestPubsub(t *testing.T) {
// Test Subscribe and Unsubscribe
// Test Subscribe and Unsubscribe.
subcases := []struct {
desc string
topic string
clientID string
errorMessage error
pubsub bool //true for subscribe and false for unsubscribe
pubsub bool // true for subscribe and false for unsubscribe.
handler messaging.MessageHandler
}{
{
+1 -1
View File
@@ -83,5 +83,5 @@ func (pub *publisher) Close() error {
}
func formatTopic(topic string) string {
return strings.Replace(topic, ">", "#", -1)
return strings.ReplaceAll(topic, ">", "#")
}
+2 -2
View File
@@ -19,7 +19,7 @@ func (sdk mfSDK) SendMessage(chanName, msg, key string) errors.SDKError {
chanID := chanNameParts[0]
subtopicPart := ""
if len(chanNameParts) == channelParts {
subtopicPart = fmt.Sprintf("/%s", strings.Replace(chanNameParts[1], ".", "/", -1))
subtopicPart = fmt.Sprintf("/%s", strings.ReplaceAll(chanNameParts[1], ".", "/"))
}
url := fmt.Sprintf("%s/channels/%s/messages/%s", sdk.httpAdapterURL, chanID, subtopicPart)
@@ -34,7 +34,7 @@ func (sdk mfSDK) ReadMessages(chanName, token string) (MessagesPage, errors.SDKE
chanID := chanNameParts[0]
subtopicPart := ""
if len(chanNameParts) == channelParts {
subtopicPart = fmt.Sprintf("?subtopic=%s", strings.Replace(chanNameParts[1], ".", "/", -1))
subtopicPart = fmt.Sprintf("?subtopic=%s", strings.ReplaceAll(chanNameParts[1], ".", "/"))
}
url := fmt.Sprintf("%s/channels/%s/messages%s", sdk.readerURL, chanID, subtopicPart)
+1 -2
View File
@@ -119,8 +119,7 @@ func (ts *transformerService) Transform(msg *messaging.Message) (interface{}, er
// of the Flatten function.
func ParseFlat(flat interface{}) interface{} {
msg := make(map[string]interface{})
switch v := flat.(type) {
case map[string]interface{}:
if v, ok := flat.(map[string]interface{}); ok {
for key, value := range v {
if value == nil {
continue
+2 -2
View File
@@ -164,13 +164,13 @@ func fmtCondition(chanID string, rpm readers.PageMetadata) (string, string) {
return sb.String(), timeRange
}
//range(start:...) is a must for FluxQL syntax
// range(start:...) is a must for FluxQL syntax.
from := `start: time(v:0)`
if value, ok := query["from"]; ok {
fromValue := int64(value.(float64)*1e9) - 1
from = fmt.Sprintf(`start: time(v: %d )`, fromValue)
}
//range(...,stop:) is an option for FluxQL syntax
// range(...,stop:) is an option for FluxQL syntax.
to := ""
if value, ok := query["to"]; ok {
toValue := int64(value.(float64) * 1e9)
+1 -4
View File
@@ -37,10 +37,7 @@ func (svc authServiceMock) Identify(ctx context.Context, in *policies.Token, opt
func (svc authServiceMock) Issue(ctx context.Context, in *policies.IssueReq, opts ...grpc.CallOption) (*policies.Token, error) {
if id, ok := svc.users[in.GetEmail()]; ok {
switch in.Type {
default:
return &policies.Token{Value: id}, nil
}
return &policies.Token{Value: id}, nil
}
return nil, errors.ErrAuthentication
}
+1 -1
View File
@@ -39,7 +39,7 @@ func (req createGroupsReq) validate() error {
return apiutil.ErrBearerToken
}
if len(req.Groups) <= 0 {
if len(req.Groups) == 0 {
return apiutil.ErrEmptyList
}
+20 -10
View File
@@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"
"time"
@@ -19,11 +18,13 @@ import (
)
// Benchmark - main benchmarking function.
func Benchmark(cfg Config) {
checkConnection(cfg.MQTT.Broker.URL, 1)
func Benchmark(cfg Config) error {
if err := checkConnection(cfg.MQTT.Broker.URL, 1); err != nil {
return err
}
logger, err := mflog.New(os.Stdout, mflog.Debug.String())
if err != nil {
log.Fatalf(err.Error())
return err
}
subsResults := map[string](*[]float64){}
@@ -44,12 +45,12 @@ func Benchmark(cfg Config) {
data, err := os.ReadFile(cfg.Mf.ConnFile)
if err != nil {
logger.Fatal(fmt.Sprintf("Error loading connections file: %s", err))
return fmt.Errorf("Error loading connections file: %s", err)
}
mf := mainflux{}
if err := toml.Unmarshal(data, &mf); err != nil {
logger.Fatal(fmt.Sprintf("Cannot load Mainflux connections config %s \nUse tools/provision to create file", cfg.Mf.ConnFile))
return fmt.Errorf("Cannot load Mainflux connections config %s \nUse tools/provision to create file", cfg.Mf.ConnFile)
}
resCh := make(chan *runResults)
@@ -70,15 +71,23 @@ func Benchmark(cfg Config) {
if cfg.MQTT.TLS.MTLS {
cert, err = tls.X509KeyPair([]byte(mfThing.MTLSCert), []byte(mfThing.MTLSKey))
if err != nil {
logger.Fatal(err.Error())
return err
}
}
c, err := makeClient(i, cfg, mfChan, mfThing, startStamp, caByte, cert)
if err != nil {
logger.Fatal(fmt.Sprintf("Unable to create message payload %s", err.Error()))
return fmt.Errorf("Unable to create message payload %s", err.Error())
}
go c.publish(resCh)
errorChan := make(chan error)
go c.publish(resCh, errorChan)
for {
err := <-errorChan
if err != nil {
return err
}
}
}
// Collect the results
@@ -100,11 +109,12 @@ func Benchmark(cfg Config) {
totalTime := time.Since(start)
totals := calculateTotalResults(results, totalTime, subsResults)
if totals == nil {
return
return fmt.Errorf("totals not assigned")
}
// Print sats
printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet)
return nil
}
func getBytePayload(size int, m message) (handler, error) {
+8 -7
View File
@@ -7,6 +7,7 @@ import (
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"log"
"net"
@@ -55,7 +56,7 @@ type message struct {
type handler func(*message) ([]byte, error)
func (c *Client) publish(r chan *runResults) {
func (c *Client) publish(r chan *runResults, errChan chan<- error) {
res := &runResults{}
times := make([]*float64, c.MsgCount)
@@ -67,7 +68,6 @@ func (c *Client) publish(r chan *runResults) {
times[i] = calcMsgRes(&m, res)
}
r <- calcRes(res, start, arr(times))
return
}
if !c.Quiet {
log.Printf("Client %v is connected to the broker %v\n", c.ID, c.BrokerURL)
@@ -83,7 +83,7 @@ func (c *Client) publish(r chan *runResults) {
}
payload, err := c.SendMsg(&m)
if err != nil {
log.Fatalf("Failed to marshal payload - %s", err.Error())
errChan <- fmt.Errorf("Failed to marshal payload - %s", err.Error())
}
for i := 0; i < c.MsgCount; i++ {
@@ -165,10 +165,10 @@ func (c *Client) connect() error {
return nil
}
func checkConnection(broker string, timeoutSecs int) {
func checkConnection(broker string, timeoutSecs int) error {
s := strings.Split(broker, ":")
if len(s) != 3 {
log.Fatalf("Wrong host address format")
return errors.New("Wrong host address format")
}
network := s[0]
@@ -186,14 +186,15 @@ func checkConnection(broker string, timeoutSecs int) {
defer conClose()
if err, ok := err.(*net.OpError); ok && err.Timeout() {
log.Fatalf("Timeout error: %s\n", err.Error())
return fmt.Errorf("Timeout error: %s\n", err.Error())
}
if err != nil {
log.Fatalf("Error: %s\n", err.Error())
return fmt.Errorf("Error: %s\n", err.Error())
}
log.Printf("Connection to %s://%s:%s looks OK\n", network, host, port)
return nil
}
func arr(a []*float64) []float64 {
+3 -1
View File
@@ -35,7 +35,9 @@ Complete documentation is available at https://docs.mainflux.io`,
}
}
bench.Benchmark(bconf)
if err := bench.Benchmark(bconf); err != nil {
log.Fatal(err)
}
},
}
+3 -1
View File
@@ -20,7 +20,9 @@ func main() {
Long: `Tool for provisioning series of Mainflux channels and things and connecting them together.
Complete documentation is available at https://docs.mainflux.io`,
Run: func(cmd *cobra.Command, args []string) {
provision.Provision(pconf)
if err := provision.Provision(pconf); err != nil {
log.Fatal(err)
}
},
}
+15 -16
View File
@@ -50,7 +50,7 @@ type Config struct {
}
// Provision - function that does actual provisiong.
func Provision(conf Config) {
func Provision(conf Config) error {
const (
rsaBits = 4096
ttl = "2400h"
@@ -84,8 +84,7 @@ func Provision(conf Config) {
// Create new user
if _, err := s.CreateUser(user, ""); err != nil {
log.Fatalf("Unable to create new user: %s", err.Error())
return
return fmt.Errorf("Unable to create new user: %s", err.Error())
}
@@ -94,8 +93,7 @@ func Provision(conf Config) {
// Login user
token, err := s.CreateToken(user)
if err != nil {
log.Fatalf("Unable to login user: %s", err.Error())
return
return fmt.Errorf("Unable to login user: %s", err.Error())
}
var tlsCert tls.Certificate
@@ -104,22 +102,22 @@ func Provision(conf Config) {
if conf.SSL {
tlsCert, err = tls.LoadX509KeyPair(conf.CA, conf.CAKey)
if err != nil {
log.Fatalf("Failed to load CA cert")
return fmt.Errorf("Failed to load CA cert")
}
b, err := os.ReadFile(conf.CA)
if err != nil {
log.Fatalf("Failed to load CA cert")
return fmt.Errorf("Failed to load CA cert")
}
block, _ := pem.Decode(b)
if block == nil {
log.Fatalf("No PEM data found, failed to decode CA")
return fmt.Errorf("No PEM data found, failed to decode CA")
}
caCert, err = x509.ParseCertificate(block.Bytes)
if err != nil {
log.Fatalf("Failed to decode certificate - %s", err.Error())
return fmt.Errorf("Failed to decode certificate - %s", err.Error())
}
}
@@ -138,12 +136,12 @@ func Provision(conf Config) {
things, err = s.CreateThings(things, token.AccessToken)
if err != nil {
log.Fatalf("Failed to create the things: %s", err.Error())
return fmt.Errorf("Failed to create the things: %s", err.Error())
}
channels, err = s.CreateChannels(channels, token.AccessToken)
if err != nil {
log.Fatalf("Failed to create the chennels: %s", err.Error())
return fmt.Errorf("Failed to create the chennels: %s", err.Error())
}
for _, t := range things {
@@ -165,14 +163,14 @@ func Provision(conf Config) {
notBefore := time.Now()
validFor, err := time.ParseDuration(ttl)
if err != nil {
log.Fatalf("Failed to set date %v", validFor)
return fmt.Errorf("Failed to set date %v", validFor)
}
notAfter := notBefore.Add(validFor)
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
if err != nil {
log.Fatalf("Failed to generate serial number: %s", err)
return fmt.Errorf("Failed to generate serial number: %s", err)
}
tmpl := x509.Certificate{
@@ -192,7 +190,7 @@ func Provision(conf Config) {
derBytes, err := x509.CreateCertificate(rand.Reader, &tmpl, caCert, publicKey(priv), tlsCert.PrivateKey)
if err != nil {
log.Fatalf("Failed to create certificate: %s", err)
return fmt.Errorf("Failed to create certificate: %s", err)
}
var bw, keyOut bytes.Buffer
@@ -200,13 +198,13 @@ func Provision(conf Config) {
buffKeyOut := bufio.NewWriter(&keyOut)
if err := pem.Encode(buffWriter, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes}); err != nil {
log.Fatalf("Failed to write cert pem data: %s", err)
return fmt.Errorf("Failed to write cert pem data: %s", err)
}
buffWriter.Flush()
cert = bw.String()
if err := pem.Encode(buffKeyOut, pemBlockForKey(priv)); err != nil {
log.Fatalf("Failed to write key pem data: %s", err)
return fmt.Errorf("Failed to write key pem data: %s", err)
}
buffKeyOut.Flush()
key = keyOut.String()
@@ -234,6 +232,7 @@ func Provision(conf Config) {
if err := s.Connect(conIDs, token.AccessToken); err != nil {
log.Fatalf("Failed to connect things %s to channels %s: %s", conIDs.ThingIDs, conIDs.ChannelIDs, err)
}
return nil
}
func publicKey(priv interface{}) interface{} {
+2 -2
View File
@@ -23,7 +23,7 @@ func New(url string, c *email.Config) (clients.Emailer, error) {
return &emailer{resetURL: url, agent: e}, err
}
func (e *emailer) SendPasswordReset(To []string, host, user, token string) error {
func (e *emailer) SendPasswordReset(to []string, host, user, token string) error {
url := fmt.Sprintf("%s%s?token=%s", host, e.resetURL, token)
return e.agent.Send(To, "", "Password Reset Request", "", user, url, "")
return e.agent.Send(to, "", "Password Reset Request", "", user, url, "")
}
+1 -4
View File
@@ -45,10 +45,7 @@ func (svc authServiceMock) Identify(ctx context.Context, in *policies.Token, opt
func (svc authServiceMock) Issue(ctx context.Context, in *policies.IssueReq, opts ...grpc.CallOption) (*policies.Token, error) {
if id, ok := svc.users[in.GetEmail()]; ok {
switch in.Type {
default:
return &policies.Token{Value: id}, nil
}
return &policies.Token{Value: id}, nil
}
return nil, errors.ErrAuthentication
}
+1 -1
View File
@@ -200,7 +200,7 @@ func (svc service) ListClients(ctx context.Context, token string, pm mfclients.P
if client.ID == id {
clients.Clients = append(clients.Clients[:i], clients.Clients[i+1:]...)
if clients.Total != 0 {
clients.Total = clients.Total - 1
clients.Total--
}
}
}
+2 -2
View File
@@ -65,8 +65,8 @@ func (repo tokenRepo) Issue(ctx context.Context, claim Claims) (Token, error) {
}
return Token{
AccessToken: string(signedAccessToken[:]),
RefreshToken: string(signedRefreshToken[:]),
AccessToken: string(signedAccessToken),
RefreshToken: string(signedRefreshToken),
AccessType: "Bearer",
}, nil
}
-1
View File
@@ -179,7 +179,6 @@ func encodeError(err error) error {
case errors.Contains(err, errors.ErrMalformedEntity),
err == apiutil.ErrInvalidAuthKey,
err == apiutil.ErrMissingID,
err == apiutil.ErrBearerToken,
err == apiutil.ErrMissingPolicySub,
err == apiutil.ErrMissingPolicyObj,
err == apiutil.ErrMalformedPolicyAct,
+1 -1
View File
@@ -94,7 +94,7 @@ func parseSubTopic(subtopic string) (string, error) {
return "", errMalformedSubtopic
}
subtopic = strings.Replace(subtopic, "/", ".", -1)
subtopic = strings.ReplaceAll(subtopic, "/", ".")
elems := strings.Split(subtopic, ".")
filteredElems := []string{}