mirror of
https://github.com/absmach/magistrala.git
synced 2026-06-23 04:10:28 +00:00
MF-379 - Log Level Option (#402)
* Adding an option to select log level Signed-off-by: Michael Finley <Michael.Finley@target.com> * making error default log level Signed-off-by: Michael Finley <Michael.Finley@target.com> * removing go-kit/levels and expanding levels wrapper Signed-off-by: Michael Finley <Michael.Finley@target.com> * refactoring test cases and using log.fatal Signed-off-by: Michael Finley <Michael.Finley@target.com> * logger.new no longer accpets enum and now accepts string for level Signed-off-by: Michael Finley <Michael.Finley@target.com> * level_test.go refactor to compare error Signed-off-by: Michael Finley <Michael.Finley@target.com> * Updating the ws README Signed-off-by: Michael Finley <Michael.Finley@target.com> * Adding log level for mqtt Adapter Signed-off-by: Michael Finley <Michael.Finley@target.com>
This commit is contained in:
committed by
Aleksandar Novaković
parent
3de34062db
commit
6600d26ef1
@@ -9,6 +9,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -18,7 +19,7 @@ import (
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/gocql/gocql"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/readers"
|
||||
"github.com/mainflux/mainflux/readers/api"
|
||||
"github.com/mainflux/mainflux/readers/cassandra"
|
||||
@@ -30,11 +31,13 @@ import (
|
||||
const (
|
||||
sep = ","
|
||||
|
||||
defLogLevel = "error"
|
||||
defPort = "8180"
|
||||
defCluster = "127.0.0.1"
|
||||
defKeyspace = "mainflux"
|
||||
defThingsURL = "localhost:8181"
|
||||
|
||||
envLogLevel = "MF_CASSANDRA_READER_LOG_LEVEL"
|
||||
envPort = "MF_CASSANDRA_READER_PORT"
|
||||
envCluster = "MF_CASSANDRA_READER_DB_CLUSTER"
|
||||
envKeyspace = "MF_CASSANDRA_READER_DB_KEYSPACE"
|
||||
@@ -42,6 +45,7 @@ const (
|
||||
)
|
||||
|
||||
type config struct {
|
||||
logLevel string
|
||||
port string
|
||||
cluster string
|
||||
keyspace string
|
||||
@@ -51,7 +55,10 @@ type config struct {
|
||||
func main() {
|
||||
cfg := loadConfig()
|
||||
|
||||
logger := log.New(os.Stdout)
|
||||
logger, err := logger.New(os.Stdout, cfg.logLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
session := connectToCassandra(cfg.cluster, cfg.keyspace, logger)
|
||||
defer session.Close()
|
||||
@@ -72,12 +79,13 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
err := <-errs
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Cassandra reader service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
return config{
|
||||
logLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
port: mainflux.Env(envPort, defPort),
|
||||
cluster: mainflux.Env(envCluster, defCluster),
|
||||
keyspace: mainflux.Env(envKeyspace, defKeyspace),
|
||||
@@ -85,7 +93,7 @@ func loadConfig() config {
|
||||
}
|
||||
}
|
||||
|
||||
func connectToCassandra(cluster, keyspace string, logger log.Logger) *gocql.Session {
|
||||
func connectToCassandra(cluster, keyspace string, logger logger.Logger) *gocql.Session {
|
||||
session, err := cassandra.Connect(strings.Split(cluster, sep), keyspace)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to Cassandra cluster: %s", err))
|
||||
@@ -95,7 +103,7 @@ func connectToCassandra(cluster, keyspace string, logger log.Logger) *gocql.Sess
|
||||
return session
|
||||
}
|
||||
|
||||
func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
|
||||
func connectToThings(url string, logger logger.Logger) *grpc.ClientConn {
|
||||
conn, err := grpc.Dial(url, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
|
||||
@@ -105,7 +113,7 @@ func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
|
||||
return conn
|
||||
}
|
||||
|
||||
func newService(session *gocql.Session, logger log.Logger) readers.MessageRepository {
|
||||
func newService(session *gocql.Session, logger logger.Logger) readers.MessageRepository {
|
||||
repo := cassandra.New(session)
|
||||
repo = api.LoggingMiddleware(repo, logger)
|
||||
repo = api.MetricsMiddleware(
|
||||
@@ -127,7 +135,7 @@ func newService(session *gocql.Session, logger log.Logger) readers.MessageReposi
|
||||
return repo
|
||||
}
|
||||
|
||||
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, errs chan error, logger log.Logger) {
|
||||
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, errs chan error, logger logger.Logger) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("Cassandra reader service started, exposed port %s", port))
|
||||
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "cassandra-reader"))
|
||||
|
||||
@@ -9,6 +9,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -18,10 +19,10 @@ import (
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/gocql/gocql"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/writers"
|
||||
"github.com/mainflux/mainflux/writers/cassandra"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
"github.com/nats-io/go-nats"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
@@ -30,11 +31,13 @@ const (
|
||||
sep = ","
|
||||
|
||||
defNatsURL = nats.DefaultURL
|
||||
defLogLevel = "error"
|
||||
defPort = "8180"
|
||||
defCluster = "127.0.0.1"
|
||||
defKeyspace = "mainflux"
|
||||
|
||||
envNatsURL = "MF_NATS_URL"
|
||||
envLogLevel = "MF_CASSANDRA_WRITER_LOG_LEVEL"
|
||||
envPort = "MF_CASSANDRA_WRITER_PORT"
|
||||
envCluster = "MF_CASSANDRA_WRITER_DB_CLUSTER"
|
||||
envKeyspace = "MF_CASSANDRA_WRITER_DB_KEYSPACE"
|
||||
@@ -42,6 +45,7 @@ const (
|
||||
|
||||
type config struct {
|
||||
natsURL string
|
||||
logLevel string
|
||||
port string
|
||||
cluster string
|
||||
keyspace string
|
||||
@@ -50,7 +54,10 @@ type config struct {
|
||||
func main() {
|
||||
cfg := loadConfig()
|
||||
|
||||
logger := log.New(os.Stdout)
|
||||
logger, err := logger.New(os.Stdout, cfg.logLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
nc := connectToNATS(cfg.natsURL, logger)
|
||||
defer nc.Close()
|
||||
@@ -73,20 +80,21 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
err := <-errs
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Cassandra writer service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
return config{
|
||||
natsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
logLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
port: mainflux.Env(envPort, defPort),
|
||||
cluster: mainflux.Env(envCluster, defCluster),
|
||||
keyspace: mainflux.Env(envKeyspace, defKeyspace),
|
||||
}
|
||||
}
|
||||
|
||||
func connectToNATS(url string, logger log.Logger) *nats.Conn {
|
||||
func connectToNATS(url string, logger logger.Logger) *nats.Conn {
|
||||
nc, err := nats.Connect(url)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
@@ -96,7 +104,7 @@ func connectToNATS(url string, logger log.Logger) *nats.Conn {
|
||||
return nc
|
||||
}
|
||||
|
||||
func connectToCassandra(cluster, keyspace string, logger log.Logger) *gocql.Session {
|
||||
func connectToCassandra(cluster, keyspace string, logger logger.Logger) *gocql.Session {
|
||||
session, err := cassandra.Connect(strings.Split(cluster, sep), keyspace)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to Cassandra cluster: %s", err))
|
||||
@@ -106,7 +114,7 @@ func connectToCassandra(cluster, keyspace string, logger log.Logger) *gocql.Sess
|
||||
return session
|
||||
}
|
||||
|
||||
func newService(session *gocql.Session, logger log.Logger) writers.MessageRepository {
|
||||
func newService(session *gocql.Session, logger logger.Logger) writers.MessageRepository {
|
||||
repo := cassandra.New(session)
|
||||
repo = writers.LoggingMiddleware(repo, logger)
|
||||
repo = writers.MetricsMiddleware(
|
||||
@@ -128,7 +136,7 @@ func newService(session *gocql.Session, logger log.Logger) writers.MessageReposi
|
||||
return repo
|
||||
}
|
||||
|
||||
func startHTTPServer(port string, errs chan error, logger log.Logger) {
|
||||
func startHTTPServer(port string, errs chan error, logger logger.Logger) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("Cassandra writer service started, exposed port %s", port))
|
||||
errs <- http.ListenAndServe(p, cassandra.MakeHandler())
|
||||
|
||||
+21
-7
@@ -9,6 +9,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -19,7 +20,7 @@ import (
|
||||
adapter "github.com/mainflux/mainflux/http"
|
||||
"github.com/mainflux/mainflux/http/api"
|
||||
"github.com/mainflux/mainflux/http/nats"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
thingsapi "github.com/mainflux/mainflux/things/api/grpc"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
@@ -28,9 +29,11 @@ import (
|
||||
|
||||
const (
|
||||
defPort string = "8180"
|
||||
defLogLevel string = "error"
|
||||
defNatsURL string = broker.DefaultURL
|
||||
defThingsURL string = "localhost:8181"
|
||||
envPort string = "MF_HTTP_ADAPTER_PORT"
|
||||
envLogLevel string = "MF_HTTP_ADAPTER_LOG_LEVEL"
|
||||
envNatsURL string = "MF_NATS_URL"
|
||||
envThingsURL string = "MF_THINGS_URL"
|
||||
)
|
||||
@@ -38,17 +41,18 @@ const (
|
||||
type config struct {
|
||||
ThingsURL string
|
||||
NatsURL string
|
||||
LogLevel string
|
||||
Port string
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := config{
|
||||
ThingsURL: mainflux.Env(envThingsURL, defThingsURL),
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
}
|
||||
|
||||
logger := log.New(os.Stdout)
|
||||
cfg := loadConfig()
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
nc, err := broker.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
@@ -102,3 +106,13 @@ func main() {
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("HTTP adapter terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
return config{
|
||||
ThingsURL: mainflux.Env(envThingsURL, defThingsURL),
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -10,7 +11,7 @@ import (
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
influxdata "github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/readers"
|
||||
"github.com/mainflux/mainflux/readers/api"
|
||||
"github.com/mainflux/mainflux/readers/influxdb"
|
||||
@@ -21,6 +22,7 @@ import (
|
||||
|
||||
const (
|
||||
defThingsURL = "localhost:8181"
|
||||
defLogLevel = "error"
|
||||
defPort = "8180"
|
||||
defDBName = "mainflux"
|
||||
defDBHost = "localhost"
|
||||
@@ -29,6 +31,7 @@ const (
|
||||
defDBPass = "mainflux"
|
||||
|
||||
envThingsURL = "MF_THINGS_URL"
|
||||
envLogLevel = "MF_INFLUX_READER_LOG_LEVEL"
|
||||
envPort = "MF_INFLUX_READER_PORT"
|
||||
envDBName = "MF_INFLUX_READER_DB_NAME"
|
||||
envDBHost = "MF_INFLUX_READER_DB_HOST"
|
||||
@@ -39,6 +42,7 @@ const (
|
||||
|
||||
type config struct {
|
||||
ThingsURL string
|
||||
LogLevel string
|
||||
Port string
|
||||
DBName string
|
||||
DBHost string
|
||||
@@ -49,8 +53,10 @@ type config struct {
|
||||
|
||||
func main() {
|
||||
cfg, clientCfg := loadConfigs()
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
conn := connectToThings(cfg.ThingsURL, logger)
|
||||
defer conn.Close()
|
||||
|
||||
@@ -85,6 +91,7 @@ func main() {
|
||||
func loadConfigs() (config, influxdata.HTTPConfig) {
|
||||
cfg := config{
|
||||
ThingsURL: mainflux.Env(envThingsURL, defThingsURL),
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
DBName: mainflux.Env(envDBName, defDBName),
|
||||
DBHost: mainflux.Env(envDBHost, defDBHost),
|
||||
@@ -102,7 +109,7 @@ func loadConfigs() (config, influxdata.HTTPConfig) {
|
||||
return cfg, clientCfg
|
||||
}
|
||||
|
||||
func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
|
||||
func connectToThings(url string, logger logger.Logger) *grpc.ClientConn {
|
||||
conn, err := grpc.Dial(url, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
|
||||
@@ -112,7 +119,7 @@ func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
|
||||
return conn
|
||||
}
|
||||
|
||||
func newService(client influxdata.Client, logger log.Logger) readers.MessageRepository {
|
||||
func newService(client influxdata.Client, logger logger.Logger) readers.MessageRepository {
|
||||
repo, _ := influxdb.New(client, "mainflux")
|
||||
repo = api.LoggingMiddleware(repo, logger)
|
||||
repo = api.MetricsMiddleware(
|
||||
@@ -134,7 +141,7 @@ func newService(client influxdata.Client, logger log.Logger) readers.MessageRepo
|
||||
return repo
|
||||
}
|
||||
|
||||
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger log.Logger, errs chan error) {
|
||||
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("InfluxDB reader service started, exposed port %s", port))
|
||||
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "influxdb-reader"))
|
||||
|
||||
+37
-31
@@ -9,6 +9,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -19,10 +20,10 @@ import (
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
influxdata "github.com/influxdata/influxdb/client/v2"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/writers"
|
||||
"github.com/mainflux/mainflux/writers/influxdb"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
"github.com/nats-io/go-nats"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
@@ -30,6 +31,7 @@ const (
|
||||
queue = "influxdb-writer"
|
||||
|
||||
defNatsURL = nats.DefaultURL
|
||||
defLogLevel = "error"
|
||||
defPort = "8180"
|
||||
defBatchSize = "5000"
|
||||
defBatchTimeout = "5"
|
||||
@@ -40,6 +42,7 @@ const (
|
||||
defDBPass = "mainflux"
|
||||
|
||||
envNatsURL = "MF_NATS_URL"
|
||||
envLogLevel = "MF_INFLUX_WRITER_LOG_LEVEL"
|
||||
envPort = "MF_INFLUX_WRITER_PORT"
|
||||
envBatchSize = "MF_INFLUX_WRITER_BATCH_SIZE"
|
||||
envBatchTimeout = "MF_INFLUX_WRITER_BATCH_TIMEOUT"
|
||||
@@ -52,9 +55,10 @@ const (
|
||||
|
||||
type config struct {
|
||||
NatsURL string
|
||||
LogLevel string
|
||||
Port string
|
||||
BatchSize int
|
||||
BatchTimeout int
|
||||
BatchSize string
|
||||
BatchTimeout string
|
||||
DBName string
|
||||
DBHost string
|
||||
DBPort string
|
||||
@@ -63,9 +67,11 @@ type config struct {
|
||||
}
|
||||
|
||||
func main() {
|
||||
logger := log.New(os.Stdout)
|
||||
cfg, clientCfg := loadConfigs(logger)
|
||||
|
||||
cfg, clientCfg := loadConfigs()
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
nc, err := nats.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
@@ -80,8 +86,20 @@ func main() {
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
timeout := time.Duration(cfg.BatchTimeout) * time.Second
|
||||
repo, err := influxdb.New(client, cfg.DBName, cfg.BatchSize, timeout)
|
||||
batchTimeout, err := strconv.Atoi(cfg.BatchTimeout)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Invalid value for batch timeout: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
batchSize, err := strconv.Atoi(cfg.BatchSize)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Invalid value of batch size: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
timeout := time.Duration(batchTimeout) * time.Second
|
||||
repo, err := influxdb.New(client, cfg.DBName, batchSize, timeout)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to create InfluxDB writer: %s", err))
|
||||
os.Exit(1)
|
||||
@@ -108,28 +126,16 @@ func main() {
|
||||
logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfigs(logger log.Logger) (config, influxdata.HTTPConfig) {
|
||||
func loadConfigs() (config, influxdata.HTTPConfig) {
|
||||
cfg := config{
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
DBName: mainflux.Env(envDBName, defDBName),
|
||||
DBHost: mainflux.Env(envDBHost, defDBHost),
|
||||
DBPort: mainflux.Env(envDBPort, defDBPort),
|
||||
DBUser: mainflux.Env(envDBUser, defDBUser),
|
||||
DBPass: mainflux.Env(envDBPass, defDBPass),
|
||||
}
|
||||
|
||||
var err error
|
||||
cfg.BatchSize, err = strconv.Atoi(mainflux.Env(envBatchSize, defBatchSize))
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Invalid value of batch size: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
cfg.BatchTimeout, err = strconv.Atoi(mainflux.Env(envBatchTimeout, defBatchTimeout))
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Invalid value for batch timeout: %s", err))
|
||||
os.Exit(1)
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
DBName: mainflux.Env(envDBName, defDBName),
|
||||
DBHost: mainflux.Env(envDBHost, defDBHost),
|
||||
DBPort: mainflux.Env(envDBPort, defDBPort),
|
||||
DBUser: mainflux.Env(envDBUser, defDBUser),
|
||||
DBPass: mainflux.Env(envDBPass, defDBPass),
|
||||
}
|
||||
|
||||
clientCfg := influxdata.HTTPConfig{
|
||||
@@ -159,7 +165,7 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
|
||||
return counter, latency
|
||||
}
|
||||
|
||||
func startHTTPService(port string, logger log.Logger, errs chan error) {
|
||||
func startHTTPService(port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("InfluxDB writer service started, exposed port %s", p))
|
||||
errs <- http.ListenAndServe(p, influxdb.MakeHandler())
|
||||
|
||||
@@ -10,6 +10,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -17,7 +18,7 @@ import (
|
||||
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/readers"
|
||||
"github.com/mainflux/mainflux/readers/api"
|
||||
"github.com/mainflux/mainflux/readers/mongodb"
|
||||
@@ -29,12 +30,14 @@ import (
|
||||
|
||||
const (
|
||||
defThingsURL = "localhost:8181"
|
||||
defLogLevel = "error"
|
||||
defPort = "8180"
|
||||
defDBName = "mainflux"
|
||||
defDBHost = "localhost"
|
||||
defDBPort = "27017"
|
||||
|
||||
envThingsURL = "MF_THINGS_URL"
|
||||
envLogLevel = "MF_MONGO_READER_LOG_LEVEL"
|
||||
envPort = "MF_MONGO_READER_PORT"
|
||||
envDBName = "MF_MONGO_READER_DB_NAME"
|
||||
envDBHost = "MF_MONGO_READER_DB_HOST"
|
||||
@@ -43,6 +46,7 @@ const (
|
||||
|
||||
type config struct {
|
||||
thingsURL string
|
||||
logLevel string
|
||||
port string
|
||||
dbName string
|
||||
dbHost string
|
||||
@@ -51,8 +55,10 @@ type config struct {
|
||||
|
||||
func main() {
|
||||
cfg := loadConfigs()
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.logLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
conn := connectToThings(cfg.thingsURL, logger)
|
||||
defer conn.Close()
|
||||
|
||||
@@ -71,13 +77,14 @@ func main() {
|
||||
|
||||
go startHTTPServer(repo, tc, cfg.port, logger, errs)
|
||||
|
||||
err := <-errs
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("MongoDB reader service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfigs() config {
|
||||
return config{
|
||||
thingsURL: mainflux.Env(envThingsURL, defThingsURL),
|
||||
logLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
port: mainflux.Env(envPort, defPort),
|
||||
dbName: mainflux.Env(envDBName, defDBName),
|
||||
dbHost: mainflux.Env(envDBHost, defDBHost),
|
||||
@@ -85,7 +92,7 @@ func loadConfigs() config {
|
||||
}
|
||||
}
|
||||
|
||||
func connectToMongoDB(host, port, name string, logger log.Logger) *mongo.Database {
|
||||
func connectToMongoDB(host, port, name string, logger logger.Logger) *mongo.Database {
|
||||
client, err := mongo.Connect(context.Background(), fmt.Sprintf("mongodb://%s:%s", host, port), nil)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to database: %s", err))
|
||||
@@ -95,7 +102,7 @@ func connectToMongoDB(host, port, name string, logger log.Logger) *mongo.Databas
|
||||
return client.Database(name)
|
||||
}
|
||||
|
||||
func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
|
||||
func connectToThings(url string, logger logger.Logger) *grpc.ClientConn {
|
||||
conn, err := grpc.Dial(url, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
|
||||
@@ -105,7 +112,7 @@ func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
|
||||
return conn
|
||||
}
|
||||
|
||||
func newService(db *mongo.Database, logger log.Logger) readers.MessageRepository {
|
||||
func newService(db *mongo.Database, logger logger.Logger) readers.MessageRepository {
|
||||
repo := mongodb.New(db)
|
||||
repo = api.LoggingMiddleware(repo, logger)
|
||||
repo = api.MetricsMiddleware(
|
||||
@@ -127,7 +134,7 @@ func newService(db *mongo.Database, logger log.Logger) readers.MessageRepository
|
||||
return repo
|
||||
}
|
||||
|
||||
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger log.Logger, errs chan error) {
|
||||
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("Mongo reader service started, exposed port %s", port))
|
||||
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "cassandra-reader"))
|
||||
|
||||
+33
-26
@@ -10,6 +10,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -17,42 +18,47 @@ import (
|
||||
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/writers"
|
||||
mongodb "github.com/mainflux/mainflux/writers/mongodb"
|
||||
"github.com/mainflux/mainflux/writers/mongodb"
|
||||
"github.com/mongodb/mongo-go-driver/mongo"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
"github.com/nats-io/go-nats"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
queue = "mongodb-writer"
|
||||
|
||||
defNatsURL = nats.DefaultURL
|
||||
defPort = "8180"
|
||||
defDBName = "mainflux"
|
||||
defDBHost = "localhost"
|
||||
defDBPort = "27017"
|
||||
defNatsURL = nats.DefaultURL
|
||||
defLogLevel = "error"
|
||||
defPort = "8180"
|
||||
defDBName = "mainflux"
|
||||
defDBHost = "localhost"
|
||||
defDBPort = "27017"
|
||||
|
||||
envNatsURL = "MF_NATS_URL"
|
||||
envPort = "MF_MONGO_WRITER_PORT"
|
||||
envDBName = "MF_MONGO_WRITER_DB_NAME"
|
||||
envDBHost = "MF_MONGO_WRITER_DB_HOST"
|
||||
envDBPort = "MF_MONGO_WRITER_DB_PORT"
|
||||
envNatsURL = "MF_NATS_URL"
|
||||
envLogLevel = "MF_MONGO_WRITER_LOG_LEVEL"
|
||||
envPort = "MF_MONGO_WRITER_PORT"
|
||||
envDBName = "MF_MONGO_WRITER_DB_NAME"
|
||||
envDBHost = "MF_MONGO_WRITER_DB_HOST"
|
||||
envDBPort = "MF_MONGO_WRITER_DB_PORT"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
NatsURL string
|
||||
Port string
|
||||
DBName string
|
||||
DBHost string
|
||||
DBPort string
|
||||
NatsURL string
|
||||
LogLevel string
|
||||
Port string
|
||||
DBName string
|
||||
DBHost string
|
||||
DBPort string
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := loadConfigs()
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
nc, err := nats.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
@@ -92,11 +98,12 @@ func main() {
|
||||
|
||||
func loadConfigs() config {
|
||||
return config{
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
DBName: mainflux.Env(envDBName, defDBName),
|
||||
DBHost: mainflux.Env(envDBHost, defDBHost),
|
||||
DBPort: mainflux.Env(envDBPort, defDBPort),
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
DBName: mainflux.Env(envDBName, defDBName),
|
||||
DBHost: mainflux.Env(envDBHost, defDBHost),
|
||||
DBPort: mainflux.Env(envDBPort, defDBPort),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,7 +125,7 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
|
||||
return counter, latency
|
||||
}
|
||||
|
||||
func startHTTPService(port string, logger log.Logger, errs chan error) {
|
||||
func startHTTPService(port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("Mongodb writer service started, exposed port %s", p))
|
||||
errs <- http.ListenAndServe(p, mongodb.MakeHandler())
|
||||
|
||||
+24
-13
@@ -9,13 +9,14 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
"github.com/mainflux/mainflux/normalizer/api"
|
||||
"github.com/mainflux/mainflux/normalizer/nats"
|
||||
@@ -26,25 +27,27 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defNatsURL string = broker.DefaultURL
|
||||
defPort string = "8180"
|
||||
envNatsURL string = "MF_NATS_URL"
|
||||
envPort string = "MF_NORMALIZER_PORT"
|
||||
defNatsURL string = broker.DefaultURL
|
||||
defLogLevel string = "error"
|
||||
defPort string = "8180"
|
||||
envNatsURL string = "MF_NATS_URL"
|
||||
envLogLevel string = "MF_NORMALIZER_LOG_LEVEL"
|
||||
envPort string = "MF_NORMALIZER_PORT"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
NatsURL string
|
||||
Port string
|
||||
NatsURL string
|
||||
LogLevel string
|
||||
Port string
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := config{
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
cfg := loadConfig()
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
nc, err := broker.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
@@ -89,3 +92,11 @@ func main() {
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Normalizer service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
return config{
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
}
|
||||
}
|
||||
|
||||
+30
-22
@@ -10,6 +10,7 @@ package main
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -20,7 +21,7 @@ import (
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/go-redis/redis"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/things"
|
||||
"github.com/mainflux/mainflux/things/api"
|
||||
grpcapi "github.com/mainflux/mainflux/things/api/grpc"
|
||||
@@ -34,6 +35,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defLogLevel = "error"
|
||||
defDBHost = "localhost"
|
||||
defDBPort = "5432"
|
||||
defDBUser = "mainflux"
|
||||
@@ -45,6 +47,7 @@ const (
|
||||
defHTTPPort = "8180"
|
||||
defGRPCPort = "8181"
|
||||
defUsersURL = "localhost:8181"
|
||||
envLogLevel = "MF_THINGS_LOG_LEVEL"
|
||||
envDBHost = "MF_THINGS_DB_HOST"
|
||||
envDBPort = "MF_THINGS_DB_PORT"
|
||||
envDBUser = "MF_THINGS_DB_USER"
|
||||
@@ -59,6 +62,7 @@ const (
|
||||
)
|
||||
|
||||
type config struct {
|
||||
LogLevel string
|
||||
DBHost string
|
||||
DBPort string
|
||||
DBUser string
|
||||
@@ -66,18 +70,20 @@ type config struct {
|
||||
DBName string
|
||||
CacheURL string
|
||||
CachePass string
|
||||
CacheDB int
|
||||
CacheDB string
|
||||
HTTPPort string
|
||||
GRPCPort string
|
||||
UsersURL string
|
||||
}
|
||||
|
||||
func main() {
|
||||
logger := log.New(os.Stdout)
|
||||
cfg := loadConfig()
|
||||
|
||||
cfg := loadConfig(logger)
|
||||
|
||||
cache := connectToCache(cfg.CacheURL, cfg.CachePass, cfg.CacheDB)
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
cache := connectToCache(cfg.CacheURL, cfg.CachePass, cfg.CacheDB, logger)
|
||||
|
||||
db := connectToDB(cfg, logger)
|
||||
defer db.Close()
|
||||
@@ -97,18 +103,13 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
err := <-errs
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Things service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig(logger log.Logger) config {
|
||||
db, err := strconv.Atoi(mainflux.Env(envCacheDB, defCacheDB))
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
return config{
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
DBHost: mainflux.Env(envDBHost, defDBHost),
|
||||
DBPort: mainflux.Env(envDBPort, defDBPort),
|
||||
DBUser: mainflux.Env(envDBUser, defDBUser),
|
||||
@@ -116,22 +117,29 @@ func loadConfig(logger log.Logger) config {
|
||||
DBName: mainflux.Env(envDBName, defDBName),
|
||||
CacheURL: mainflux.Env(envCacheURL, defCacheURL),
|
||||
CachePass: mainflux.Env(envCachePass, defCachePass),
|
||||
CacheDB: db,
|
||||
CacheDB: mainflux.Env(envCacheDB, defCacheDB),
|
||||
HTTPPort: mainflux.Env(envHTTPPort, defHTTPPort),
|
||||
GRPCPort: mainflux.Env(envGRPCPort, defGRPCPort),
|
||||
UsersURL: mainflux.Env(envUsersURL, defUsersURL),
|
||||
}
|
||||
}
|
||||
|
||||
func connectToCache(cacheURL, cachePass string, cacheDB int) *redis.Client {
|
||||
func connectToCache(cacheURL, cachePass string, cacheDB string, logger logger.Logger) *redis.Client {
|
||||
|
||||
db, err := strconv.Atoi(cacheDB)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: cacheURL,
|
||||
Password: cachePass,
|
||||
DB: cacheDB,
|
||||
DB: db,
|
||||
})
|
||||
}
|
||||
|
||||
func connectToDB(cfg config, logger log.Logger) *sql.DB {
|
||||
func connectToDB(cfg config, logger logger.Logger) *sql.DB {
|
||||
db, err := postgres.Connect(cfg.DBHost, cfg.DBPort, cfg.DBName, cfg.DBUser, cfg.DBPass)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to postgres: %s", err))
|
||||
@@ -140,7 +148,7 @@ func connectToDB(cfg config, logger log.Logger) *sql.DB {
|
||||
return db
|
||||
}
|
||||
|
||||
func connectToUsersService(usersAddr string, logger log.Logger) *grpc.ClientConn {
|
||||
func connectToUsersService(usersAddr string, logger logger.Logger) *grpc.ClientConn {
|
||||
conn, err := grpc.Dial(usersAddr, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to users service: %s", err))
|
||||
@@ -149,7 +157,7 @@ func connectToUsersService(usersAddr string, logger log.Logger) *grpc.ClientConn
|
||||
return conn
|
||||
}
|
||||
|
||||
func newService(conn *grpc.ClientConn, db *sql.DB, client *redis.Client, logger log.Logger) things.Service {
|
||||
func newService(conn *grpc.ClientConn, db *sql.DB, client *redis.Client, logger logger.Logger) things.Service {
|
||||
users := usersapi.NewClient(conn)
|
||||
thingsRepo := postgres.NewThingRepository(db, logger)
|
||||
channelsRepo := postgres.NewChannelRepository(db, logger)
|
||||
@@ -177,13 +185,13 @@ func newService(conn *grpc.ClientConn, db *sql.DB, client *redis.Client, logger
|
||||
return svc
|
||||
}
|
||||
|
||||
func startHTTPServer(svc things.Service, port string, logger log.Logger, errs chan error) {
|
||||
func startHTTPServer(svc things.Service, port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("Things service started, exposed port %s", port))
|
||||
errs <- http.ListenAndServe(p, httpapi.MakeHandler(svc))
|
||||
}
|
||||
|
||||
func startGRPCServer(svc things.Service, port string, logger log.Logger, errs chan error) {
|
||||
func startGRPCServer(svc things.Service, port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
listener, err := net.Listen("tcp", p)
|
||||
if err != nil {
|
||||
|
||||
+15
-8
@@ -10,6 +10,7 @@ package main
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -18,7 +19,7 @@ import (
|
||||
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/users"
|
||||
"github.com/mainflux/mainflux/users/api"
|
||||
grpcapi "github.com/mainflux/mainflux/users/api/grpc"
|
||||
@@ -31,6 +32,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defLogLevel = "error"
|
||||
defDBHost = "localhost"
|
||||
defDBPort = "5432"
|
||||
defDBUser = "mainflux"
|
||||
@@ -39,6 +41,7 @@ const (
|
||||
defHTTPPort = "8180"
|
||||
defGRPCPort = "8181"
|
||||
defSecret = "users"
|
||||
envLogLevel = "MF_USERS_LOG_LEVEL"
|
||||
envDBHost = "MF_USERS_DB_HOST"
|
||||
envDBPort = "MF_USERS_DB_PORT"
|
||||
envDBUser = "MF_USERS_DB_USER"
|
||||
@@ -50,6 +53,7 @@ const (
|
||||
)
|
||||
|
||||
type config struct {
|
||||
LogLevel string
|
||||
DBHost string
|
||||
DBPort string
|
||||
DBUser string
|
||||
@@ -63,8 +67,10 @@ type config struct {
|
||||
func main() {
|
||||
cfg := loadConfig()
|
||||
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
db := connectToDB(cfg, logger)
|
||||
defer db.Close()
|
||||
|
||||
@@ -80,12 +86,13 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
err := <-errs
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Users service terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
return config{
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
DBHost: mainflux.Env(envDBHost, defDBHost),
|
||||
DBPort: mainflux.Env(envDBPort, defDBPort),
|
||||
DBUser: mainflux.Env(envDBUser, defDBUser),
|
||||
@@ -97,7 +104,7 @@ func loadConfig() config {
|
||||
}
|
||||
}
|
||||
|
||||
func connectToDB(cfg config, logger log.Logger) *sql.DB {
|
||||
func connectToDB(cfg config, logger logger.Logger) *sql.DB {
|
||||
db, err := postgres.Connect(cfg.DBHost, cfg.DBPort, cfg.DBName, cfg.DBUser, cfg.DBPass)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to postgres: %s", err))
|
||||
@@ -106,7 +113,7 @@ func connectToDB(cfg config, logger log.Logger) *sql.DB {
|
||||
return db
|
||||
}
|
||||
|
||||
func newService(db *sql.DB, secret string, logger log.Logger) users.Service {
|
||||
func newService(db *sql.DB, secret string, logger logger.Logger) users.Service {
|
||||
repo := postgres.New(db)
|
||||
hasher := bcrypt.New()
|
||||
idp := jwt.New(secret)
|
||||
@@ -131,13 +138,13 @@ func newService(db *sql.DB, secret string, logger log.Logger) users.Service {
|
||||
return svc
|
||||
}
|
||||
|
||||
func startHTTPServer(svc users.Service, port string, logger log.Logger, errs chan error) {
|
||||
func startHTTPServer(svc users.Service, port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
logger.Info(fmt.Sprintf("Users HTTP service started, exposed port %s", port))
|
||||
errs <- http.ListenAndServe(p, httpapi.MakeHandler(svc, logger))
|
||||
}
|
||||
|
||||
func startGRPCServer(svc users.Service, port string, logger log.Logger, errs chan error) {
|
||||
func startGRPCServer(svc users.Service, port string, logger logger.Logger, errs chan error) {
|
||||
p := fmt.Sprintf(":%s", port)
|
||||
listener, err := net.Listen("tcp", p)
|
||||
if err != nil {
|
||||
|
||||
+18
-8
@@ -9,6 +9,7 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
@@ -16,7 +17,7 @@ import (
|
||||
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
thingsapi "github.com/mainflux/mainflux/things/api/grpc"
|
||||
adapter "github.com/mainflux/mainflux/ws"
|
||||
"github.com/mainflux/mainflux/ws/api"
|
||||
@@ -28,9 +29,11 @@ import (
|
||||
|
||||
const (
|
||||
defPort = "8180"
|
||||
defLogLevel = "error"
|
||||
defNatsURL = broker.DefaultURL
|
||||
defThingsURL = "localhost:8181"
|
||||
envPort = "MF_WS_ADAPTER_PORT"
|
||||
envLogLevel = "MF_WS_ADAPTER_LOG_LEVEL"
|
||||
envNatsURL = "MF_NATS_URL"
|
||||
envThingsURL = "MF_THINGS_URL"
|
||||
)
|
||||
@@ -38,18 +41,17 @@ const (
|
||||
type config struct {
|
||||
ThingsURL string
|
||||
NatsURL string
|
||||
LogLevel string
|
||||
Port string
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := config{
|
||||
ThingsURL: mainflux.Env(envThingsURL, defThingsURL),
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
cfg := loadConfig()
|
||||
|
||||
logger, err := logger.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
nc, err := broker.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
@@ -101,3 +103,11 @@ func main() {
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("WebSocket adapter terminated: %s", err))
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
return config{
|
||||
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
|
||||
LogLevel: mainflux.Env(envLogLevel, defLogLevel),
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
}
|
||||
}
|
||||
|
||||
+8
-6
@@ -8,11 +8,12 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|----------------------|---------------------|-----------------------|
|
||||
| MF_HTTP_ADAPTER_PORT | Service HTTP port | 8180 |
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_THINGS_URL | Things service URL | localhost:8181 |
|
||||
| Variable | Description | Default |
|
||||
|-----------------------------|--------------------------------|-----------------------|
|
||||
| MF_HTTP_ADAPTER_LOG_LEVEL | Log level for the HTTP Adapter | error |
|
||||
| MF_HTTP_ADAPTER_PORT | Service HTTP port | 8180 |
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_THINGS_URL | Things service URL | localhost:8181 |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -30,6 +31,7 @@ services:
|
||||
environment:
|
||||
MF_THINGS_URL: [Things service URL]
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_HTTP_ADAPTER_LOG_LEVEL: [HTTP Adapter Log Level]
|
||||
MF_HTTP_ADAPTER_PORT: [Service HTTP port]
|
||||
```
|
||||
|
||||
@@ -48,7 +50,7 @@ make http
|
||||
make install
|
||||
|
||||
# set the environment variables and run the service
|
||||
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_HTTP_ADAPTER_PORT=[Service HTTP port] $GOBIN/mainflux-http
|
||||
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_HTTP_ADAPTER_LOG_LEVEL=[HTTP Adapter Log Level] MF_HTTP_ADAPTER_PORT=[Service HTTP port] $GOBIN/mainflux-http
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -7,6 +7,11 @@
|
||||
|
||||
package logger
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
// Error level is used when logging errors.
|
||||
Error Level = iota + 1
|
||||
@@ -14,8 +19,12 @@ const (
|
||||
Warn
|
||||
// Info level is used when logging info data.
|
||||
Info
|
||||
// Debug level is used when logging debugging info.
|
||||
Debug
|
||||
)
|
||||
|
||||
var ErrInvalidLogLevel = errors.New("unrecognized log level")
|
||||
|
||||
// Level represents severity level while logging.
|
||||
type Level int
|
||||
|
||||
@@ -23,8 +32,29 @@ var levels = map[Level]string{
|
||||
Error: "error",
|
||||
Warn: "warn",
|
||||
Info: "info",
|
||||
Debug: "debug",
|
||||
}
|
||||
|
||||
func (lvl Level) String() string {
|
||||
return levels[lvl]
|
||||
}
|
||||
|
||||
func (lvl Level) isAllowed(logLevel Level) bool {
|
||||
return lvl <= logLevel
|
||||
}
|
||||
|
||||
func (lvl *Level) UnmarshalText(text string) error {
|
||||
switch string(strings.ToLower(text)) {
|
||||
case "debug":
|
||||
*lvl = Debug
|
||||
case "info":
|
||||
*lvl = Info
|
||||
case "warn":
|
||||
*lvl = Warn
|
||||
case "error":
|
||||
*lvl = Error
|
||||
default:
|
||||
return ErrInvalidLogLevel
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUnmarshalText(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
input string
|
||||
output Level
|
||||
err error
|
||||
}{
|
||||
"select log level Not_A_Level": {"Not_A_Level", 1, ErrInvalidLogLevel},
|
||||
"select log level Bad_Input": {"Bad_Input", 1, ErrInvalidLogLevel},
|
||||
|
||||
"select log level debug": {"debug", Debug, nil},
|
||||
"select log level DEBUG": {"DEBUG", Debug, nil},
|
||||
"select log level info": {"info", Info, nil},
|
||||
"select log level INFO": {"INFO", Info, nil},
|
||||
"select log level warn": {"warn", Warn, nil},
|
||||
"select log level WARN": {"WARN", Warn, nil},
|
||||
"select log level Error": {"Error", Error, nil},
|
||||
"select log level ERROR": {"ERROR", Error, nil},
|
||||
}
|
||||
|
||||
var logLevel Level
|
||||
for desc, tc := range cases {
|
||||
err := logLevel.UnmarshalText(tc.input)
|
||||
assert.Equal(t, tc.output, logLevel, fmt.Sprintf("%s: expected %s got %d", desc, tc.output, logLevel))
|
||||
assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected %s got %d", desc, tc.err, err))
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestLevelIsAllowed(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
requestedLevel Level
|
||||
allowedLevel Level
|
||||
output bool
|
||||
}{
|
||||
"log debug when level debug": {Debug, Debug, true},
|
||||
"log info when level debug": {Info, Debug, true},
|
||||
"log warn when level debug": {Warn, Debug, true},
|
||||
"log error when level debug": {Error, Debug, true},
|
||||
"log warn when level info": {Warn, Info, true},
|
||||
"log error when level warn": {Error, Warn, true},
|
||||
"log error when level error": {Error, Error, true},
|
||||
|
||||
"log debug when level error": {Debug, Error, false},
|
||||
"log info when level error": {Info, Error, false},
|
||||
"log warn when level error": {Warn, Error, false},
|
||||
"log debug when level warn": {Debug, Warn, false},
|
||||
"log info when level warn": {Info, Warn, false},
|
||||
"log debug when level info": {Debug, Info, false},
|
||||
}
|
||||
for desc, tc := range cases {
|
||||
result := tc.requestedLevel.isAllowed(tc.allowedLevel)
|
||||
assert.Equal(t, tc.output, result, fmt.Sprintf("%s: expected %t got %t", desc, tc.output, result))
|
||||
}
|
||||
}
|
||||
+28
-7
@@ -8,13 +8,16 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"fmt"
|
||||
"github.com/go-kit/kit/log"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Logger specifies logging API.
|
||||
type Logger interface {
|
||||
// Debug logs any object in JSON format on debug level.
|
||||
Debug(string)
|
||||
// Info logs any object in JSON format on info level.
|
||||
Info(string)
|
||||
// Warn logs any object in JSON format on warning level.
|
||||
@@ -27,23 +30,41 @@ var _ Logger = (*logger)(nil)
|
||||
|
||||
type logger struct {
|
||||
kitLogger log.Logger
|
||||
level Level
|
||||
}
|
||||
|
||||
// New returns wrapped go kit logger.
|
||||
func New(out io.Writer) Logger {
|
||||
func New(out io.Writer, levelText string) (Logger, error) {
|
||||
var level Level
|
||||
err := level.UnmarshalText(levelText)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(`{"level":"error","message":"%s: %s","ts":"%s"}`, err, levelText, time.RFC3339Nano)
|
||||
}
|
||||
l := log.NewJSONLogger(log.NewSyncWriter(out))
|
||||
l = log.With(l, "ts", log.DefaultTimestampUTC)
|
||||
return &logger{l}
|
||||
return &logger{l, level}, err
|
||||
}
|
||||
|
||||
func (l logger) Debug(msg string) {
|
||||
if Debug.isAllowed(l.level) {
|
||||
l.kitLogger.Log("level", Debug.String(), "message", msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (l logger) Info(msg string) {
|
||||
l.kitLogger.Log("level", Info.String(), "message", msg)
|
||||
if Info.isAllowed(l.level) {
|
||||
l.kitLogger.Log("level", Info.String(), "message", msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (l logger) Warn(msg string) {
|
||||
l.kitLogger.Log("level", Warn.String(), "message", msg)
|
||||
if Warn.isAllowed(l.level) {
|
||||
l.kitLogger.Log("level", Warn.String(), "message", msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (l logger) Error(msg string) {
|
||||
l.kitLogger.Log("level", Error.String(), "message", msg)
|
||||
if Error.isAllowed(l.level) {
|
||||
l.kitLogger.Log("level", Error.String(), "message", msg)
|
||||
}
|
||||
}
|
||||
|
||||
+49
-19
@@ -18,6 +18,10 @@ import (
|
||||
)
|
||||
|
||||
var _ io.Writer = (*mockWriter)(nil)
|
||||
var writer mockWriter
|
||||
var logger log.Logger
|
||||
var err error
|
||||
var output logMsg
|
||||
|
||||
type mockWriter struct {
|
||||
value []byte
|
||||
@@ -39,42 +43,68 @@ type logMsg struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func TestInfo(t *testing.T) {
|
||||
func TestDebug(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
input string
|
||||
output logMsg
|
||||
input string
|
||||
logLevel string
|
||||
output logMsg
|
||||
}{
|
||||
"info log ordinary string": {"input_string", logMsg{log.Info.String(), "input_string"}},
|
||||
"info log empty string": {"", logMsg{log.Info.String(), ""}},
|
||||
"debug log ordinary string": {"input_string", log.Debug.String(), logMsg{log.Debug.String(), "input_string"}},
|
||||
"debug log empty string": {"", log.Debug.String(), logMsg{log.Debug.String(), ""}},
|
||||
"debug ordinary string lvl not allowed": {"input_string", log.Info.String(), logMsg{"", ""}},
|
||||
"debug empty string lvl not allowed": {"", log.Info.String(), logMsg{"", ""}},
|
||||
}
|
||||
|
||||
writer := mockWriter{}
|
||||
logger := log.New(&writer)
|
||||
for desc, tc := range cases {
|
||||
writer = mockWriter{}
|
||||
logger, err = log.New(&writer, tc.logLevel)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err))
|
||||
logger.Debug(tc.input)
|
||||
output, err = writer.Read()
|
||||
assert.Equal(t, tc.output, output, fmt.Sprintf("%s: expected %s got %s", desc, tc.output, output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestInfo(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
input string
|
||||
logLevel string
|
||||
output logMsg
|
||||
}{
|
||||
"info log ordinary string": {"input_string", log.Info.String(), logMsg{log.Info.String(), "input_string"}},
|
||||
"info log empty string": {"", log.Info.String(), logMsg{log.Info.String(), ""}},
|
||||
"info ordinary string lvl not allowed": {"input_string", log.Warn.String(), logMsg{"", ""}},
|
||||
"info empty string lvl not allowed": {"", log.Warn.String(), logMsg{"", ""}},
|
||||
}
|
||||
|
||||
for desc, tc := range cases {
|
||||
logger.Info(tc.input)
|
||||
output, err := writer.Read()
|
||||
writer = mockWriter{}
|
||||
logger, err = log.New(&writer, tc.logLevel)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err))
|
||||
logger.Info(tc.input)
|
||||
output, err = writer.Read()
|
||||
assert.Equal(t, tc.output, output, fmt.Sprintf("%s: expected %s got %s", desc, tc.output, output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestWarn(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
input string
|
||||
output logMsg
|
||||
input string
|
||||
logLevel string
|
||||
output logMsg
|
||||
}{
|
||||
"warn log ordinary string": {"input_string", logMsg{log.Warn.String(), "input_string"}},
|
||||
"warn log empty string": {"", logMsg{log.Warn.String(), ""}},
|
||||
"warn log ordinary string": {"input_string", log.Warn.String(), logMsg{log.Warn.String(), "input_string"}},
|
||||
"warn log empty string": {"", log.Warn.String(), logMsg{log.Warn.String(), ""}},
|
||||
"warn ordinary string lvl not allowed": {"input_string", log.Error.String(), logMsg{"", ""}},
|
||||
"warn empty string lvl not allowed": {"", log.Error.String(), logMsg{"", ""}},
|
||||
}
|
||||
|
||||
writer := mockWriter{}
|
||||
logger := log.New(&writer)
|
||||
|
||||
for desc, tc := range cases {
|
||||
logger.Warn(tc.input)
|
||||
output, err := writer.Read()
|
||||
writer = mockWriter{}
|
||||
logger, err = log.New(&writer, tc.logLevel)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err))
|
||||
logger.Warn(tc.input)
|
||||
output, err = writer.Read()
|
||||
assert.Equal(t, tc.output, output, fmt.Sprintf("%s: expected %s got %s", desc, tc.output, output))
|
||||
}
|
||||
}
|
||||
@@ -89,7 +119,7 @@ func TestError(t *testing.T) {
|
||||
}
|
||||
|
||||
writer := mockWriter{}
|
||||
logger := log.New(&writer)
|
||||
logger, _ := log.New(&writer, log.Error.String())
|
||||
|
||||
for desc, tc := range cases {
|
||||
logger.Error(tc.input)
|
||||
|
||||
+13
-11
@@ -9,16 +9,17 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|----------------------|---------------------|-----------------------|
|
||||
| MF_MQTT_ADAPTER_PORT | Service MQTT port | 1883 |
|
||||
| MF_MQTT_WS_PORT | WebSocket port | 8880 |
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_MQTT_REDIS_PORT | Redis port | 6379 |
|
||||
| MF_MQTT_REDIS_HOST | Redis host | localhost |
|
||||
| MF_MQTT_REDIS_PASS | Redis pass | mqtt |
|
||||
| MF_MQTT_REDIS_DB | Redis db | 0 |
|
||||
| MF_THINGS_URL | Things service URL | localhost:8181 |
|
||||
| Variable | Description | Default |
|
||||
|-----------------------------|------------------------|-----------------------|
|
||||
| MF_MQTT_ADAPTER_LOG_LEVEL | MQTT adapter log level | error |
|
||||
| MF_MQTT_ADAPTER_PORT | Service MQTT port | 1883 |
|
||||
| MF_MQTT_WS_PORT | WebSocket port | 8880 |
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_MQTT_REDIS_PORT | Redis port | 6379 |
|
||||
| MF_MQTT_REDIS_HOST | Redis host | localhost |
|
||||
| MF_MQTT_REDIS_PASS | Redis pass | mqtt |
|
||||
| MF_MQTT_REDIS_DB | Redis db | 0 |
|
||||
| MF_THINGS_URL | Things service URL | localhost:8181 |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -36,6 +37,7 @@ services:
|
||||
environment:
|
||||
MF_THINGS_URL: [Things service URL]
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_MQTT_ADAPTER_LOG_LEVEL: [MQTT adapter log level]
|
||||
MF_MQTT_ADAPTER_PORT: [Service MQTT port]
|
||||
MF_MQTT_WS_PORT: [Service WS port]
|
||||
MF_MQTT_REDIS_PORT: [Redis port]
|
||||
@@ -56,7 +58,7 @@ cd $GOPATH/src/github.com/mainflux/mainflux/mqtt
|
||||
npm install
|
||||
|
||||
# set the environment variables and run the service
|
||||
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_MQTT_ADAPTER_PORT=[Service MQTT port] MF_MQTT_WS_PORT=[Service WS port] MF_MQTT_REDIS_PORT=[Redis port] MF_MQTT_REDIS_HOST=[Redis host] MF_MQTT_REDIS_PASS=[Redis pass] MF_MQTT_REDIS_DB=[Redis db] node mqtt.js ..
|
||||
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_MQTT_ADAPTER_LOG_LEVEL=[MQTT adapter log level] MF_MQTT_ADAPTER_PORT=[Service MQTT port] MF_MQTT_WS_PORT=[Service WS port] MF_MQTT_REDIS_PORT=[Redis port] MF_MQTT_REDIS_HOST=[Redis host] MF_MQTT_REDIS_PASS=[Redis pass] MF_MQTT_REDIS_DB=[Redis db] node mqtt.js ..
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
+5
-1
@@ -12,6 +12,7 @@ var http = require('http'),
|
||||
// pass a proto file as a buffer/string or pass a parsed protobuf-schema object
|
||||
var logger = bunyan.createLogger({name: "mqtt"}),
|
||||
config = {
|
||||
log_level: process.env.MF_MQTT_ADAPTER_LOG_LEVEL || 'error',
|
||||
mqtt_port: Number(process.env.MF_MQTT_ADAPTER_PORT) || 1883,
|
||||
ws_port: Number(process.env.MF_MQTT_WS_PORT) || 8880,
|
||||
nats_url: process.env.MF_NATS_URL || 'nats://localhost:4222',
|
||||
@@ -42,9 +43,12 @@ var logger = bunyan.createLogger({name: "mqtt"}),
|
||||
|
||||
logging({
|
||||
instance: aedes,
|
||||
servers: servers
|
||||
servers: servers,
|
||||
pinoOptions: {level: config.log_level}
|
||||
});
|
||||
|
||||
logger.level(config.log_level);
|
||||
|
||||
// MQTT over WebSocket
|
||||
function startWs() {
|
||||
var server = http.createServer();
|
||||
|
||||
+1
-1
@@ -15,7 +15,7 @@
|
||||
"dependencies": {
|
||||
"2": "^1.0.2",
|
||||
"aedes": "^0.35.2",
|
||||
"aedes-logging": "^1.0.1",
|
||||
"aedes-logging": "^2.0.1",
|
||||
"aedes-persistence-redis": "^5.1.0",
|
||||
"atob": "^2.0.3",
|
||||
"bunyan": "^1.5.1",
|
||||
|
||||
@@ -9,10 +9,11 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|--------------------|------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_NORMALIZER_PORT | Normalizer service HTTP port | 8180 |
|
||||
| Variable | Description | Default |
|
||||
|---------------------------|------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_NORMALIZER_LOG_LEVEL | Log level for the Normalizer | error |
|
||||
| MF_NORMALIZER_PORT | Normalizer service HTTP port | 8180 |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -28,6 +29,7 @@ services:
|
||||
container_name: [instance name]
|
||||
environment:
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_NORMALIZER_LOG_LEVEL: [Normalizer log level]
|
||||
MF_NORMALIZER_PORT: [Service HTTP port]
|
||||
```
|
||||
|
||||
@@ -46,5 +48,5 @@ make normalizer
|
||||
make install
|
||||
|
||||
# set the environment variables and run the service
|
||||
MF_NATS_URL=[NATS instance URL] MF_NORMALIZER_PORT=[Service HTTP port] $GOBIN/mainflux-normalizer
|
||||
MF_NATS_URL=[NATS instance URL] MF_NORMALIZER_LOG_LEVEL=[Normalizer log level] MF_NORMALIZER_PORT=[Service HTTP port] $GOBIN/mainflux-normalizer
|
||||
```
|
||||
|
||||
@@ -12,19 +12,19 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
)
|
||||
|
||||
var _ normalizer.Service = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger log.Logger
|
||||
logger logger.Logger
|
||||
svc normalizer.Service
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the core service.
|
||||
func LoggingMiddleware(svc normalizer.Service, logger log.Logger) normalizer.Service {
|
||||
func LoggingMiddleware(svc normalizer.Service, logger logger.Logger) normalizer.Service {
|
||||
return &loggingMiddleware{
|
||||
logger: logger,
|
||||
svc: svc,
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
"github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -14,19 +14,19 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/readers"
|
||||
)
|
||||
|
||||
var _ readers.MessageRepository = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger log.Logger
|
||||
logger logger.Logger
|
||||
svc readers.MessageRepository
|
||||
}
|
||||
|
||||
// LoggingMiddleware adds logging facilities to the core service.
|
||||
func LoggingMiddleware(svc readers.MessageRepository, logger log.Logger) readers.MessageRepository {
|
||||
func LoggingMiddleware(svc readers.MessageRepository, logger logger.Logger) readers.MessageRepository {
|
||||
return &loggingMiddleware{
|
||||
logger: logger,
|
||||
svc: svc,
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
dockertest "gopkg.in/ory-am/dockertest.v3"
|
||||
)
|
||||
|
||||
var logger = log.New(os.Stdout)
|
||||
var logger, _ = log.New(os.Stdout, log.Info.String())
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
pool, err := dockertest.NewPool("")
|
||||
|
||||
@@ -34,7 +34,7 @@ var (
|
||||
Publisher: 1,
|
||||
Protocol: "mqtt",
|
||||
}
|
||||
testLog = log.New(os.Stdout)
|
||||
testLog, _ = log.New(os.Stdout, log.Info.String())
|
||||
)
|
||||
|
||||
func TestReadAll(t *testing.T) {
|
||||
|
||||
@@ -39,7 +39,7 @@ var (
|
||||
Publisher: 1,
|
||||
Protocol: "mqtt",
|
||||
}
|
||||
testLog = log.New(os.Stdout)
|
||||
testLog, _ = log.New(os.Stdout, log.Info.String())
|
||||
)
|
||||
|
||||
func TestReadAll(t *testing.T) {
|
||||
|
||||
+16
-14
@@ -17,19 +17,20 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|-----------------------|------------------------------------------|----------------|
|
||||
| MF_THINGS_DB_HOST | Database host address | localhost |
|
||||
| MF_THINGS_DB_PORT | Database host port | 5432 |
|
||||
| MF_THINGS_DB_USER | Database user | mainflux |
|
||||
| MF_THINGS_DB_PASS | Database password | mainflux |
|
||||
| MF_THINGS_DB | Name of the database used by the service | things |
|
||||
| MF_THINGS_CACHE_URL | Cache database URL | localhost:6379 |
|
||||
| MF_THINGS_CACHE_PASS | Cache database password | |
|
||||
| MF_THINGS_CACHE_DB | Cache instance that should be used | 0 |
|
||||
| MF_THINGS_HTTP_PORT | Things service HTTP port | 8180 |
|
||||
| MF_THINGS_GRPC_PORT | Things service gRPC port | 8181 |
|
||||
| MF_USERS_URL | Users service URL | localhost:8181 |
|
||||
| Variable | Description | Default |
|
||||
|-----------------------|-------------------------------------------------|----------------|
|
||||
| MF_THINGS_LOG_LEVEL | Log level for Things (debug, info, warn, error) | error |
|
||||
| MF_THINGS_DB_HOST | Database host address | localhost |
|
||||
| MF_THINGS_DB_PORT | Database host port | 5432 |
|
||||
| MF_THINGS_DB_USER | Database user | mainflux |
|
||||
| MF_THINGS_DB_PASS | Database password | mainflux |
|
||||
| MF_THINGS_DB | Name of the database used by the service | things |
|
||||
| MF_THINGS_CACHE_URL | Cache database URL | localhost:6379 |
|
||||
| MF_THINGS_CACHE_PASS | Cache database password | |
|
||||
| MF_THINGS_CACHE_DB | Cache instance that should be used | 0 |
|
||||
| MF_THINGS_HTTP_PORT | Things service HTTP port | 8180 |
|
||||
| MF_THINGS_GRPC_PORT | Things service gRPC port | 8181 |
|
||||
| MF_USERS_URL | Users service URL | localhost:8181 |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -46,6 +47,7 @@ services:
|
||||
ports:
|
||||
- [host machine port]:[configured HTTP port]
|
||||
environment:
|
||||
MF_THINGS_LOG_LEVEL: [Things log level]
|
||||
MF_THINGS_DB_HOST: [Database host address]
|
||||
MF_THINGS_DB_PORT: [Database host port]
|
||||
MF_THINGS_DB_USER: [Database user]
|
||||
@@ -75,7 +77,7 @@ make things
|
||||
make install
|
||||
|
||||
# set the environment variables and run the service
|
||||
MF_THINGS_DB_HOST=[Database host address] MF_THINGS_DB_PORT=[Database host port] MF_THINGS_DB_USER=[Database user] MF_THINGS_DB_PASS=[Database password] MF_THINGS_DB=[Name of the database used by the service] MF_THINGS_CACHE_URL=[Cache database URL] MF_THINGS_CACHE_PASS=[Cache database password] MF_THINGS_CACHE_DB=[Cache instance that should be used] MF_THINGS_HTTP_PORT=[Service HTTP port] MF_THINGS_GRPC_PORT=[Service gRPC port] MF_USERS_URL=[Users service URL] $GOBIN/mainflux-things
|
||||
MF_THINGS_LOG_LEVEL=[Things log level] MF_THINGS_DB_HOST=[Database host address] MF_THINGS_DB_PORT=[Database host port] MF_THINGS_DB_USER=[Database user] MF_THINGS_DB_PASS=[Database password] MF_THINGS_DB=[Name of the database used by the service] MF_THINGS_CACHE_URL=[Cache database URL] MF_THINGS_CACHE_PASS=[Cache database password] MF_THINGS_CACHE_DB=[Cache instance that should be used] MF_THINGS_HTTP_PORT=[Service HTTP port] MF_THINGS_GRPC_PORT=[Service gRPC port] MF_USERS_URL=[Users service URL] $GOBIN/mainflux-things
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -27,7 +27,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
testLog = logger.New(os.Stdout)
|
||||
testLog, _ = logger.New(os.Stdout, logger.Info.String())
|
||||
db *sql.DB
|
||||
)
|
||||
|
||||
|
||||
+13
-11
@@ -16,16 +16,17 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|----------------------|------------------------------------------|--------------|
|
||||
| MF_USERS_DB_HOST | Database host address | localhost |
|
||||
| MF_USERS_DB_PORT | Database host port | 5432 |
|
||||
| MF_USERS_DB_USER | Database user | mainflux |
|
||||
| MF_USERS_DB_PASSWORD | Database password | mainflux |
|
||||
| MF_USERS_DB | Name of the database used by the service | users |
|
||||
| MF_USERS_HTTP_PORT | Users service HTTP port | 8180 |
|
||||
| MF_USERS_GRPC_PORT | Users service gRPC port | 8181 |
|
||||
| MF_USERS_SECRET | String used for signing tokens | users |
|
||||
| Variable | Description | Default |
|
||||
|----------------------|-------------------------------------------------|--------------|
|
||||
| MF_USERS_LOG_LEVEL | Log level for Users (debug, info, warn, error) | error |
|
||||
| MF_USERS_DB_HOST | Database host address | localhost |
|
||||
| MF_USERS_DB_PORT | Database host port | 5432 |
|
||||
| MF_USERS_DB_USER | Database user | mainflux |
|
||||
| MF_USERS_DB_PASSWORD | Database password | mainflux |
|
||||
| MF_USERS_DB | Name of the database used by the service | users |
|
||||
| MF_USERS_HTTP_PORT | Users service HTTP port | 8180 |
|
||||
| MF_USERS_GRPC_PORT | Users service gRPC port | 8181 |
|
||||
| MF_USERS_SECRET | String used for signing tokens | users |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -42,6 +43,7 @@ services:
|
||||
ports:
|
||||
- [host machine port]:[configured HTTP port]
|
||||
environment:
|
||||
MF_USERS_LOG_LEVEL: [Users log level]
|
||||
MF_USERS_DB_HOST: [Database host address]
|
||||
MF_USERS_DB_PORT: [Database host port]
|
||||
MF_USERS_DB_USER: [Database user]
|
||||
@@ -67,7 +69,7 @@ make users
|
||||
make install
|
||||
|
||||
# set the environment variables and run the service
|
||||
MF_USERS_DB_HOST=[Database host address] MF_USERS_DB_PORT=[Database host port] MF_USERS_DB_USER=[Database user] MF_USERS_DB_PASS=[Database password] MF_USERS_DB=[Name of the database used by the service] MF_USERS_HTTP_PORT=[Service HTTP port] MF_USERS_GRPC_PORT=[Service gRPC port] MF_USERS_SECRET=[String used for signing tokens] $GOBIN/mainflux-users
|
||||
MF_USERS_LOG_LEVEL=[Users log level] MF_USERS_DB_HOST=[Database host address] MF_USERS_DB_PORT=[Database host port] MF_USERS_DB_USER=[Database user] MF_USERS_DB_PASS=[Database password] MF_USERS_DB=[Name of the database used by the service] MF_USERS_HTTP_PORT=[Service HTTP port] MF_USERS_GRPC_PORT=[Service gRPC port] MF_USERS_SECRET=[String used for signing tokens] $GOBIN/mainflux-users
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -66,7 +66,7 @@ func newService() users.Service {
|
||||
}
|
||||
|
||||
func newServer(svc users.Service) *httptest.Server {
|
||||
logger := log.New(os.Stdout)
|
||||
logger, _:= log.New(os.Stdout, log.Info.String())
|
||||
mux := httpapi.MakeHandler(svc, logger)
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
@@ -8,12 +8,13 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|---------------------------------|---------------------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 |
|
||||
| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 |
|
||||
| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux |
|
||||
| Variable | Description | Default |
|
||||
|---------------------------------|------------------------------------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_CASSANDRA_WRITER_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error |
|
||||
| MF_CASSANDRA_WRITER_PORT | Service HTTP port | 8180 |
|
||||
| MF_CASSANDRA_WRITER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 |
|
||||
| MF_CASSANDRA_WRITER_DB_KEYSPACE | Cassandra keyspace name | mainflux |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -27,6 +28,7 @@ default values.
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_CASSANDRA_WRITER_LOG_LEVEL: [Cassandra writer log level]
|
||||
MF_CASSANDRA_WRITER_PORT: [Service HTTP port]
|
||||
MF_CASSANDRA_WRITER_DB_CLUSTER: [Cassandra cluster comma separated addresses]
|
||||
MF_CASSANDRA_WRITER_DB_KEYSPACE: [Cassandra keyspace name]
|
||||
@@ -50,7 +52,7 @@ make cassandra-writer
|
||||
make install
|
||||
|
||||
# Set the environment variables and run the service
|
||||
MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra-writer
|
||||
MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_LOG_LEVEL=[Cassandra writer log level] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra-writer
|
||||
|
||||
```
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
dockertest "gopkg.in/ory-am/dockertest.v3"
|
||||
)
|
||||
|
||||
var logger = log.New(os.Stdout)
|
||||
var logger, _ = log.New(os.Stdout, log.Info.String())
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
pool, err := dockertest.NewPool("")
|
||||
|
||||
+14
-12
@@ -8,17 +8,18 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|--------------------------------|---------------------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
|
||||
| MF_INFLUX_WRITER_BATCH_SIZE | Size of the writer points batch | 5000 |
|
||||
| MF_INFLUX_WRITER_BATCH_TIMEOUT | Time interval in seconds to flush the batch | 1 second |
|
||||
| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux |
|
||||
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
|
||||
| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 |
|
||||
| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux |
|
||||
| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux |
|
||||
| Variable | Description | Default |
|
||||
|--------------------------------|-----------------------------------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_INFLUX_WRITER_LOG_LEVEL | Log level for InfluxDB writer (debug, info, warn, error) | error |
|
||||
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
|
||||
| MF_INFLUX_WRITER_BATCH_SIZE | Size of the writer points batch | 5000 |
|
||||
| MF_INFLUX_WRITER_BATCH_TIMEOUT | Time interval in seconds to flush the batch | 1 second |
|
||||
| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux |
|
||||
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
|
||||
| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 |
|
||||
| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux |
|
||||
| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -32,6 +33,7 @@ default values.
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_INFLUX_WRITER_LOG_LEVEL: [Influx writer log level]
|
||||
MF_INFLUX_WRITER_PORT: [Service HTTP port]
|
||||
MF_INFLUX_WRITER_BATCH_SIZE: [Size of the writer points batch]
|
||||
MF_INFLUX_WRITER_BATCH_TIMEOUT: [Time interval in seconds to flush the batch]
|
||||
@@ -60,7 +62,7 @@ make influxdb
|
||||
make install
|
||||
|
||||
# Set the environment variables and run the service
|
||||
MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_BATCH_SIZE=[Size of the writer points batch] MF_INFLUX_WRITER_BATCH_TIMEOUT=[Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] $GOBIN/mainflux-influxdb
|
||||
MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_LOG_LEVEL=[Influx writer log level] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_BATCH_SIZE=[Size of the writer points batch] MF_INFLUX_WRITER_BATCH_TIMEOUT=[Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] $GOBIN/mainflux-influxdb
|
||||
|
||||
```
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ import (
|
||||
|
||||
var (
|
||||
port string
|
||||
testLog = log.New(os.Stdout)
|
||||
testLog, _ = log.New(os.Stdout, log.Info.String())
|
||||
testDB = "test"
|
||||
saveTimeout = 2 * time.Second
|
||||
saveBatchSize = 20
|
||||
@@ -32,8 +32,7 @@ var (
|
||||
client influxdata.Client
|
||||
selectMsgs = fmt.Sprintf("SELECT * FROM test..messages")
|
||||
dropMsgs = fmt.Sprintf("DROP SERIES FROM messages")
|
||||
|
||||
clientCfg = influxdata.HTTPConfig{
|
||||
clientCfg = influxdata.HTTPConfig{
|
||||
Username: "test",
|
||||
Password: "test",
|
||||
}
|
||||
|
||||
@@ -8,13 +8,14 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|-------------------------|-----------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 |
|
||||
| MF_MONGO_WRITER_DB_NAME | Default MongoDB database name | mainflux |
|
||||
| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost |
|
||||
| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 |
|
||||
| Variable | Description | Default |
|
||||
|--------------------------------|--------------------------------|-----------------------|
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_MONGO_WRITER_LOG_LEVEL | Log level for MongoDB writer | error |
|
||||
| MF_MONGO_WRITER_PORT | Service HTTP port | 8180 |
|
||||
| MF_MONGO_WRITER_DB_NAME | Default MongoDB database name | mainflux |
|
||||
| MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost |
|
||||
| MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -30,6 +31,7 @@ default values.
|
||||
restart: on-failure
|
||||
environment:
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_MONGO_WRITER_LOG_LEVEL: [MongoDB writer log level]
|
||||
MF_MONGO_WRITER_PORT: [Service HTTP port]
|
||||
MF_MONGO_WRITER_DB_NAME: [MongoDB name]
|
||||
MF_MONGO_WRITER_DB_HOST: [MongoDB host]
|
||||
@@ -54,7 +56,7 @@ make mongodb-writer
|
||||
make install
|
||||
|
||||
# Set the environment variables and run the service
|
||||
MF_NATS_URL=[NATS instance URL] MF_MONGO_WRITER_PORT=[Service HTTP port] MF_MONGO_WRITER_DB_NAME=[MongoDB database name] MF_MONGO_WRITER_DB_HOST=[MongoDB database host] MF_MONGO_WRITER_DB_PORT=[MongoDB database port] $GOBIN/mainflux-mongodb-writer
|
||||
MF_NATS_URL=[NATS instance URL] MF_MONGO_WRITER_LOG_LEVEL=[MongoDB writer log level] MF_MONGO_WRITER_PORT=[Service HTTP port] MF_MONGO_WRITER_DB_NAME=[MongoDB database name] MF_MONGO_WRITER_DB_HOST=[MongoDB database host] MF_MONGO_WRITER_DB_PORT=[MongoDB database port] $GOBIN/mainflux-mongodb-writer
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
var (
|
||||
port string
|
||||
addr string
|
||||
testLog = log.New(os.Stdout)
|
||||
testLog, _ = log.New(os.Stdout, log.Info.String())
|
||||
testDB = "test"
|
||||
collection = "mainflux"
|
||||
db mongo.Database
|
||||
|
||||
+8
-6
@@ -8,11 +8,12 @@ The service is configured using the environment variables presented in the
|
||||
following table. Note that any unset variables will be replaced with their
|
||||
default values.
|
||||
|
||||
| Variable | Description | Default |
|
||||
|----------------------|---------------------|-----------------------|
|
||||
| MF_WS_ADAPTER_PORT | Service WS port | 8180 |
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_THINGS_URL | Things service URL | localhost:8181 |
|
||||
| Variable | Description | Default |
|
||||
|---------------------------|-------------------------------|-----------------------|
|
||||
| MF_WS_ADAPTER_LOG_LEVEL | Log level for the WS Adapter | error |
|
||||
| MF_WS_ADAPTER_PORT | Service WS port | 8180 |
|
||||
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
|
||||
| MF_THINGS_URL | Things service URL | localhost:8181 |
|
||||
|
||||
## Deployment
|
||||
|
||||
@@ -31,6 +32,7 @@ services:
|
||||
MF_THINGS_URL: [Things service URL]
|
||||
MF_NATS_URL: [NATS instance URL]
|
||||
MF_WS_ADAPTER_PORT: [Service WS port]
|
||||
MF_WS_ADAPTER_LOG_LEVEL: [WS adapter log level]
|
||||
```
|
||||
|
||||
To start the service outside of the container, execute the following shell script:
|
||||
@@ -48,7 +50,7 @@ make ws
|
||||
make install
|
||||
|
||||
# set the environment variables and run the service
|
||||
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_WS_ADAPTER_PORT=[Service WS port] $GOBIN/mainflux-ws
|
||||
MF_THINGS_URL=[Things service URL] MF_NATS_URL=[NATS instance URL] MF_WS_ADAPTER_PORT=[Service WS port] MF_WS_ADAPTER_LOG_LEVEL=[WS adapter log level] $GOBIN/mainflux-ws
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -43,7 +43,8 @@ func newService() ws.Service {
|
||||
}
|
||||
|
||||
func newHTTPServer(svc ws.Service, tc mainflux.ThingsServiceClient) *httptest.Server {
|
||||
mux := api.MakeHandler(svc, tc, log.New(os.Stdout))
|
||||
logger, _ := log.New(os.Stdout, log.Info.String())
|
||||
mux := api.MakeHandler(svc, tc, logger)
|
||||
return httptest.NewServer(mux)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user