mirror of
https://github.com/absmach/magistrala.git
synced 2026-06-23 04:10:28 +00:00
MF-174 - Add logger interface and go-kit logger implementation (#212)
* Add logger interface and go-kit logger implementation Add wrapper logger interface and wrap go-kit logger in it. Add tests for info, warn, error and log methods. Add log filtering by level. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor logger and replace go-kit logger with wrapper Replace go-kit logger from services with logger wrapper. Refactor code in logger wrapper. Remove unnecessary subpackage, methods and log levels. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
This commit is contained in:
committed by
Dejan Mijić
parent
301d855015
commit
88b30626dd
+6
-6
@@ -7,12 +7,12 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/mainflux/mainflux"
|
||||
adapter "github.com/mainflux/mainflux/http"
|
||||
"github.com/mainflux/mainflux/http/api"
|
||||
"github.com/mainflux/mainflux/http/nats"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
manager "github.com/mainflux/mainflux/manager/client"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
@@ -40,12 +40,11 @@ func main() {
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
}
|
||||
|
||||
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
|
||||
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
nc, err := broker.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Log("error", err)
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
defer nc.Close()
|
||||
@@ -75,7 +74,7 @@ func main() {
|
||||
go func() {
|
||||
p := fmt.Sprintf(":%s", cfg.Port)
|
||||
mc := manager.NewClient(cfg.ManagerURL)
|
||||
logger.Log("message", fmt.Sprintf("HTTP adapter service started, exposed port %s", cfg.Port))
|
||||
logger.Info(fmt.Sprintf("HTTP adapter service started, exposed port %s", cfg.Port))
|
||||
errs <- http.ListenAndServe(p, api.MakeHandler(svc, mc))
|
||||
}()
|
||||
|
||||
@@ -85,5 +84,6 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
logger.Log("terminated", <-errs)
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("HTTP adapter terminated: %s", err))
|
||||
}
|
||||
|
||||
+6
-6
@@ -7,9 +7,9 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/manager"
|
||||
"github.com/mainflux/mainflux/manager/api"
|
||||
"github.com/mainflux/mainflux/manager/bcrypt"
|
||||
@@ -56,12 +56,11 @@ func main() {
|
||||
Secret: mainflux.Env(envSecret, defSecret),
|
||||
}
|
||||
|
||||
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
|
||||
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
db, err := postgres.Connect(cfg.DBHost, cfg.DBPort, cfg.DBName, cfg.DBUser, cfg.DBPass)
|
||||
if err != nil {
|
||||
logger.Log("error", err)
|
||||
logger.Error(fmt.Sprintf("Failed to connect to postgres: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
defer db.Close()
|
||||
@@ -94,7 +93,7 @@ func main() {
|
||||
|
||||
go func() {
|
||||
p := fmt.Sprintf(":%s", cfg.Port)
|
||||
logger.Log("message", fmt.Sprintf("Manager service started, exposed port %s", cfg.Port))
|
||||
logger.Info(fmt.Sprintf("Manager service started, exposed port %s", cfg.Port))
|
||||
errs <- http.ListenAndServe(p, api.MakeHandler(svc))
|
||||
}()
|
||||
|
||||
@@ -104,5 +103,6 @@ func main() {
|
||||
errs <- fmt.Errorf("%s", <-c)
|
||||
}()
|
||||
|
||||
logger.Log("terminated", <-errs)
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Manager service terminated: %s", err))
|
||||
}
|
||||
|
||||
@@ -7,8 +7,8 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/normalizer"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
|
||||
@@ -34,12 +34,11 @@ func main() {
|
||||
Port: mainflux.Env(envPort, defPort),
|
||||
}
|
||||
|
||||
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
|
||||
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
|
||||
logger := log.New(os.Stdout)
|
||||
|
||||
nc, err := nats.Connect(cfg.NatsURL)
|
||||
if err != nil {
|
||||
logger.Log("error", fmt.Sprintf("Failed to connect: %s", err))
|
||||
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
defer nc.Close()
|
||||
@@ -48,7 +47,7 @@ func main() {
|
||||
|
||||
go func() {
|
||||
p := fmt.Sprintf(":%s", cfg.Port)
|
||||
logger.Log("message", fmt.Sprintf("Normalizer service started, exposed port %s", cfg.Port))
|
||||
logger.Info(fmt.Sprintf("Normalizer service started, exposed port %s", cfg.Port))
|
||||
errs <- http.ListenAndServe(p, normalizer.MakeHandler())
|
||||
}()
|
||||
|
||||
@@ -73,5 +72,7 @@ func main() {
|
||||
}, []string{"method"})
|
||||
|
||||
normalizer.Subscribe(nc, logger, counter, latency)
|
||||
logger.Log("terminated", <-errs)
|
||||
|
||||
err = <-errs
|
||||
logger.Error(fmt.Sprintf("Normalizer service terminated: %s", err))
|
||||
}
|
||||
|
||||
+9
-6
@@ -1,10 +1,11 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
)
|
||||
|
||||
var _ mainflux.MessagePublisher = (*loggingMiddleware)(nil)
|
||||
@@ -19,12 +20,14 @@ func LoggingMiddleware(svc mainflux.MessagePublisher, logger log.Logger) mainflu
|
||||
return &loggingMiddleware{logger, svc}
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) error {
|
||||
func (lm *loggingMiddleware) Publish(msg mainflux.RawMessage) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "publish",
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method publish took %s to complete", time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Publish(msg)
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
// Package logger contains logger API definition, wrapper that
|
||||
// can be used around any other logger.
|
||||
package logger
|
||||
@@ -0,0 +1,23 @@
|
||||
package logger
|
||||
|
||||
const (
|
||||
// Error level is used when logging errors.
|
||||
Error Level = iota + 1
|
||||
// Warn level is used when logging warnings.
|
||||
Warn
|
||||
// Info level is used when logging info data.
|
||||
Info
|
||||
)
|
||||
|
||||
// Level represents severity level while logging.
|
||||
type Level int
|
||||
|
||||
var levels = map[Level]string{
|
||||
Error: "error",
|
||||
Warn: "warn",
|
||||
Info: "info",
|
||||
}
|
||||
|
||||
func (lvl Level) String() string {
|
||||
return levels[lvl]
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
)
|
||||
|
||||
// Logger specifies logging API.
|
||||
type Logger interface {
|
||||
// Info logs any object in JSON format on info level.
|
||||
Info(string)
|
||||
// Warn logs any object in JSON format on warning level.
|
||||
Warn(string)
|
||||
// Error logs any object in JSON format on error level.
|
||||
Error(string)
|
||||
}
|
||||
|
||||
var _ Logger = (*logger)(nil)
|
||||
|
||||
type logger struct {
|
||||
kitLogger log.Logger
|
||||
}
|
||||
|
||||
// New returns wrapped go kit logger.
|
||||
func New(out io.Writer) Logger {
|
||||
l := log.NewJSONLogger(log.NewSyncWriter(out))
|
||||
l = log.With(l, "ts", log.DefaultTimestampUTC)
|
||||
return &logger{l}
|
||||
}
|
||||
|
||||
func (l logger) Info(msg string) {
|
||||
l.kitLogger.Log("level", Info.String(), "message", msg)
|
||||
}
|
||||
|
||||
func (l logger) Warn(msg string) {
|
||||
l.kitLogger.Log("level", Warn.String(), "message", msg)
|
||||
}
|
||||
|
||||
func (l logger) Error(msg string) {
|
||||
l.kitLogger.Log("level", Error.String(), "message", msg)
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
package logger_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var _ io.Writer = (*mockWriter)(nil)
|
||||
|
||||
type mockWriter struct {
|
||||
value []byte
|
||||
}
|
||||
|
||||
func (writer *mockWriter) Write(p []byte) (int, error) {
|
||||
writer.value = p
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (writer *mockWriter) Read() (logMsg, error) {
|
||||
var output logMsg
|
||||
err := json.Unmarshal(writer.value, &output)
|
||||
return output, err
|
||||
}
|
||||
|
||||
type logMsg struct {
|
||||
Level string `json:"level"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func TestInfo(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
input string
|
||||
output logMsg
|
||||
}{
|
||||
"info log ordinary string": {"input_string", logMsg{log.Info.String(), "input_string"}},
|
||||
"info log empty string": {"", logMsg{log.Info.String(), ""}},
|
||||
}
|
||||
|
||||
writer := mockWriter{}
|
||||
logger := log.New(&writer)
|
||||
|
||||
for desc, tc := range cases {
|
||||
logger.Info(tc.input)
|
||||
output, err := writer.Read()
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err))
|
||||
assert.Equal(t, tc.output, output, fmt.Sprintf("%s: expected %s got %s", desc, tc.output, output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestWarn(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
input string
|
||||
output logMsg
|
||||
}{
|
||||
"warn log ordinary string": {"input_string", logMsg{log.Warn.String(), "input_string"}},
|
||||
"warn log empty string": {"", logMsg{log.Warn.String(), ""}},
|
||||
}
|
||||
|
||||
writer := mockWriter{}
|
||||
logger := log.New(&writer)
|
||||
|
||||
for desc, tc := range cases {
|
||||
logger.Warn(tc.input)
|
||||
output, err := writer.Read()
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err))
|
||||
assert.Equal(t, tc.output, output, fmt.Sprintf("%s: expected %s got %s", desc, tc.output, output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestError(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
input string
|
||||
output logMsg
|
||||
}{
|
||||
"error log ordinary string": {"input_string", logMsg{log.Error.String(), "input_string"}},
|
||||
"error log empty string": {"", logMsg{log.Error.String(), ""}},
|
||||
}
|
||||
|
||||
writer := mockWriter{}
|
||||
logger := log.New(&writer)
|
||||
|
||||
for desc, tc := range cases {
|
||||
logger.Error(tc.input)
|
||||
output, err := writer.Read()
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err))
|
||||
assert.Equal(t, tc.output, output, fmt.Sprintf("%s: expected %s got %s", desc, tc.output, output))
|
||||
}
|
||||
}
|
||||
+103
-115
@@ -1,9 +1,10 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/manager"
|
||||
)
|
||||
|
||||
@@ -21,12 +22,13 @@ func LoggingMiddleware(svc manager.Service, logger log.Logger) manager.Service {
|
||||
|
||||
func (lm *loggingMiddleware) Register(user manager.User) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "register",
|
||||
"email", user.Email,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method register for user %s took %s to complete", user.Email, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Register(user)
|
||||
@@ -34,12 +36,12 @@ func (lm *loggingMiddleware) Register(user manager.User) (err error) {
|
||||
|
||||
func (lm *loggingMiddleware) Login(user manager.User) (token string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "login",
|
||||
"email", user.Email,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method login for user %s took %s to complete", user.Email, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Login(user)
|
||||
@@ -47,13 +49,12 @@ func (lm *loggingMiddleware) Login(user manager.User) (token string, err error)
|
||||
|
||||
func (lm *loggingMiddleware) AddClient(key string, client manager.Client) (id string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "add_client",
|
||||
"key", key,
|
||||
"id", id,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method add_client for key %s and client %s took %s to complete", key, id, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.AddClient(key, client)
|
||||
@@ -61,13 +62,12 @@ func (lm *loggingMiddleware) AddClient(key string, client manager.Client) (id st
|
||||
|
||||
func (lm *loggingMiddleware) UpdateClient(key string, client manager.Client) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "update_client",
|
||||
"key", key,
|
||||
"id", client.ID,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method update_client for key %s and client %s took %s to complete", key, client.ID, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.UpdateClient(key, client)
|
||||
@@ -75,13 +75,12 @@ func (lm *loggingMiddleware) UpdateClient(key string, client manager.Client) (er
|
||||
|
||||
func (lm *loggingMiddleware) ViewClient(key string, id string) (client manager.Client, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "view_client",
|
||||
"key", key,
|
||||
"id", id,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method view_client for key %s and client %s took %s to complete", key, id, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.ViewClient(key, id)
|
||||
@@ -89,12 +88,12 @@ func (lm *loggingMiddleware) ViewClient(key string, id string) (client manager.C
|
||||
|
||||
func (lm *loggingMiddleware) ListClients(key string) (clients []manager.Client, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "list_clients",
|
||||
"key", key,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method list_clients for key %s took %s to complete", key, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.ListClients(key)
|
||||
@@ -102,13 +101,12 @@ func (lm *loggingMiddleware) ListClients(key string) (clients []manager.Client,
|
||||
|
||||
func (lm *loggingMiddleware) RemoveClient(key string, id string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "remove_client",
|
||||
"key", key,
|
||||
"id", id,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method remove_client for key %s and client %s took %s to complete", key, id, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.RemoveClient(key, id)
|
||||
@@ -116,13 +114,12 @@ func (lm *loggingMiddleware) RemoveClient(key string, id string) (err error) {
|
||||
|
||||
func (lm *loggingMiddleware) CreateChannel(key string, channel manager.Channel) (id string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "create_channel",
|
||||
"key", key,
|
||||
"id", id,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method create_channel for key %s and channel %s took %s to complete", key, id, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.CreateChannel(key, channel)
|
||||
@@ -130,13 +127,12 @@ func (lm *loggingMiddleware) CreateChannel(key string, channel manager.Channel)
|
||||
|
||||
func (lm *loggingMiddleware) UpdateChannel(key string, channel manager.Channel) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "update_channel",
|
||||
"key", key,
|
||||
"id", channel.ID,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method update_channel for key %s and channel %s took %s to complete", key, channel.ID, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.UpdateChannel(key, channel)
|
||||
@@ -144,13 +140,12 @@ func (lm *loggingMiddleware) UpdateChannel(key string, channel manager.Channel)
|
||||
|
||||
func (lm *loggingMiddleware) ViewChannel(key string, id string) (channel manager.Channel, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "view_channel",
|
||||
"key", key,
|
||||
"id", id,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method view_channel for key %s and channel %s took %s to complete", key, id, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.ViewChannel(key, id)
|
||||
@@ -158,12 +153,12 @@ func (lm *loggingMiddleware) ViewChannel(key string, id string) (channel manager
|
||||
|
||||
func (lm *loggingMiddleware) ListChannels(key string) (channels []manager.Channel, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "list_channels",
|
||||
"key", key,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method list_channels for key %s took %s to complete", key, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.ListChannels(key)
|
||||
@@ -171,56 +166,51 @@ func (lm *loggingMiddleware) ListChannels(key string) (channels []manager.Channe
|
||||
|
||||
func (lm *loggingMiddleware) RemoveChannel(key string, id string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "remove_channel",
|
||||
"key", key,
|
||||
"id", id,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method remove_channel for key %s and channel %s took %s to complete", key, id, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.RemoveChannel(key, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Connect(key, chanId, clientId string) (err error) {
|
||||
func (lm *loggingMiddleware) Connect(key, chanID, clientID string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "connect",
|
||||
"key", key,
|
||||
"channel", chanId,
|
||||
"client", clientId,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method connect for key %s, channel %s, client %s took %s to complete", key, chanID, clientID, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Connect(key, chanId, clientId)
|
||||
return lm.svc.Connect(key, chanID, clientID)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Disconnect(key, chanId, clientId string) (err error) {
|
||||
func (lm *loggingMiddleware) Disconnect(key, chanID, clientID string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "disconnect",
|
||||
"key", key,
|
||||
"channel", chanId,
|
||||
"client", clientId,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method disconnect for key %s, channel %s, client %s took %s to complete", key, chanID, clientID, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Disconnect(key, chanId, clientId)
|
||||
return lm.svc.Disconnect(key, chanID, clientID)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Identity(key string) (id string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "identity",
|
||||
"id", id,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method identity for client %s took %s to complete", id, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.Identity(key)
|
||||
@@ -228,14 +218,12 @@ func (lm *loggingMiddleware) Identity(key string) (id string, err error) {
|
||||
|
||||
func (lm *loggingMiddleware) CanAccess(key string, id string) (pub string, err error) {
|
||||
defer func(begin time.Time) {
|
||||
lm.logger.Log(
|
||||
"method", "can_access",
|
||||
"key", key,
|
||||
"id", id,
|
||||
"publisher", pub,
|
||||
"error", err,
|
||||
"took", time.Since(begin),
|
||||
)
|
||||
message := fmt.Sprintf("Method can_access for key %s, channel %s and publisher %s took %s to complete", key, id, pub, time.Since(begin))
|
||||
if err != nil {
|
||||
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
|
||||
return
|
||||
}
|
||||
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
|
||||
}(time.Now())
|
||||
|
||||
return lm.svc.CanAccess(key, id)
|
||||
|
||||
@@ -4,10 +4,10 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cisco/senml"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/mainflux/mainflux"
|
||||
log "github.com/mainflux/mainflux/logger"
|
||||
nats "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
@@ -33,12 +33,12 @@ func (ef eventFlow) handleMsg(m *nats.Msg) {
|
||||
msg := mainflux.RawMessage{}
|
||||
|
||||
if err := proto.Unmarshal(m.Data, &msg); err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Unmarshalling failed: %s", err))
|
||||
ef.logger.Warn(fmt.Sprintf("Unmarshalling failed: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := ef.publish(msg); err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Publishing failed: %s", err))
|
||||
ef.logger.Warn(fmt.Sprintf("Publishing failed: %s", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -46,19 +46,19 @@ func (ef eventFlow) handleMsg(m *nats.Msg) {
|
||||
func (ef eventFlow) publish(msg mainflux.RawMessage) error {
|
||||
normalized, err := ef.normalize(msg)
|
||||
if err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Normalization failed: %s", err))
|
||||
ef.logger.Warn(fmt.Sprintf("Normalization failed: %s", err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range normalized {
|
||||
data, err := proto.Marshal(&v)
|
||||
if err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Marshalling failed: %s", err))
|
||||
ef.logger.Warn(fmt.Sprintf("Marshalling failed: %s", err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err = ef.nc.Publish(output, data); err != nil {
|
||||
ef.logger.Log("error", fmt.Sprintf("Publishing failed: %s", err))
|
||||
ef.logger.Warn(fmt.Sprintf("Publishing failed: %s", err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user