mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:30:25 +00:00
NOISSUE - Fix RE logging (#176)
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
+6
-10
@@ -190,7 +190,7 @@ func main() {
|
||||
}
|
||||
defer authnClient.Close()
|
||||
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
|
||||
errs := make(chan error, channBuffer)
|
||||
runInfo := make(chan re.RunInfo, channBuffer)
|
||||
|
||||
domsGrpcCfg := grpcclient.Config{}
|
||||
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
|
||||
@@ -233,7 +233,7 @@ func main() {
|
||||
readersClient := grpcClient.NewReadersClient(client.Connection(), regrpcCfg.Timeout)
|
||||
logger.Info("Readers gRPC client successfully connected to readers gRPC server " + client.Secure())
|
||||
|
||||
svc, err := newService(database, errs, msgSub, writersPub, alarmsPub, authz, ec, logger, readersClient)
|
||||
svc, err := newService(database, runInfo, msgSub, writersPub, alarmsPub, authz, ec, logger, readersClient)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create services: %s", err))
|
||||
exitCode = 1
|
||||
@@ -254,12 +254,8 @@ func main() {
|
||||
}
|
||||
|
||||
go func() {
|
||||
for err := range errs {
|
||||
if err != nil {
|
||||
logger.Warn("Error handling rule", slog.String("error", err.Error()))
|
||||
continue
|
||||
}
|
||||
logger.Info("Handling rule completed successfully")
|
||||
for info := range runInfo {
|
||||
logger.LogAttrs(context.Background(), info.Level, info.Message, info.Details...)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -289,7 +285,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(db pgclient.Database, errs chan error, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, authz mgauthz.Authorization, ec email.Config, logger *slog.Logger, readersClient grpcReadersV1.ReadersServiceClient) (re.Service, error) {
|
||||
func newService(db pgclient.Database, runInfo chan re.RunInfo, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, authz mgauthz.Authorization, ec email.Config, logger *slog.Logger, readersClient grpcReadersV1.ReadersServiceClient) (re.Service, error) {
|
||||
repo := repg.NewRepository(db)
|
||||
idp := uuid.New()
|
||||
|
||||
@@ -298,7 +294,7 @@ func newService(db pgclient.Database, errs chan error, rePubSub messaging.PubSub
|
||||
logger.Error(fmt.Sprintf("failed to configure e-mailing util: %s", err.Error()))
|
||||
}
|
||||
|
||||
csvc := re.NewService(repo, errs, idp, rePubSub, writersPub, alarmsPub, re.NewTicker(time.Second*30), emailerClient, readersClient)
|
||||
csvc := re.NewService(repo, runInfo, idp, rePubSub, writersPub, alarmsPub, re.NewTicker(time.Second*30), emailerClient, readersClient)
|
||||
csvc, err = middleware.AuthorizationMiddleware(csvc, authz)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
+44
-20
@@ -5,6 +5,7 @@ package re
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@@ -37,14 +38,14 @@ func (re *re) Handle(msg *messaging.Message) error {
|
||||
|
||||
for _, r := range page.Rules {
|
||||
go func(ctx context.Context) {
|
||||
re.errors <- re.process(ctx, r, msg)
|
||||
re.runInfo <- re.process(ctx, r, msg)
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error {
|
||||
func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) RunInfo {
|
||||
l := lua.NewState()
|
||||
defer l.Close()
|
||||
preload(l)
|
||||
@@ -59,38 +60,37 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error
|
||||
l.SetGlobal("aes_encrypt", l.NewFunction(luaEncrypt))
|
||||
l.SetGlobal("aes_decrypt", l.NewFunction(luaDecrypt))
|
||||
|
||||
details := []slog.Attr{
|
||||
slog.String("domain_id", r.DomainID),
|
||||
slog.String("rule_id", r.ID),
|
||||
slog.String("rule_name", r.Name),
|
||||
slog.Time("time", time.Now().UTC()),
|
||||
}
|
||||
if err := l.DoString(r.Logic.Value); err != nil {
|
||||
return err
|
||||
return RunInfo{Level: slog.LevelError, Message: "failed to run rule logic" + err.Error(), Details: details}
|
||||
}
|
||||
// Get the last result.
|
||||
result := l.Get(-1)
|
||||
if result == lua.LNil {
|
||||
return nil
|
||||
return RunInfo{Level: slog.LevelWarn, Message: "rule with nil script result", Details: details}
|
||||
}
|
||||
// Converting Lua is an expensive operation, so
|
||||
// don't do it if there are no outputs.
|
||||
if len(r.Logic.Outputs) == 0 {
|
||||
return nil
|
||||
return RunInfo{Level: slog.LevelWarn, Message: "rule with no output channels", Details: details}
|
||||
}
|
||||
var err error
|
||||
res := convertLua(result)
|
||||
for _, o := range r.Logic.Outputs {
|
||||
// If value is false, don't run the follow-up.
|
||||
if v, ok := res.(bool); ok && !v {
|
||||
return nil
|
||||
return RunInfo{Level: slog.LevelInfo, Message: err.Error(), Details: details}
|
||||
}
|
||||
if e := re.handleOutput(ctx, o, r, msg, res); e != nil {
|
||||
err = errors.Wrap(e, err)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (re *re) processReportConfig(ctx context.Context, cfg ReportConfig) error {
|
||||
if _, err := re.generateReport(ctx, cfg, EmailReport); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return RunInfo{Level: slog.LevelInfo, Message: "rule processed successfully", Details: details}
|
||||
}
|
||||
|
||||
func (re *re) handleOutput(ctx context.Context, o ScriptOutput, r Rule, msg *messaging.Message, val interface{}) error {
|
||||
@@ -125,14 +125,19 @@ func (re *re) StartScheduler(ctx context.Context) error {
|
||||
|
||||
page, err := re.repo.ListRules(ctx, pm)
|
||||
if err != nil {
|
||||
re.errors <- err
|
||||
re.runInfo <- RunInfo{
|
||||
Level: slog.LevelError,
|
||||
Message: "failed to list rules" + err.Error(),
|
||||
Details: []slog.Attr{slog.Time("due", due)},
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
for _, r := range page.Rules {
|
||||
go func(rule Rule) {
|
||||
if _, err := re.repo.UpdateRuleDue(ctx, rule.ID, rule.Schedule.NextDue()); err != nil {
|
||||
re.errors <- err
|
||||
re.runInfo <- RunInfo{Level: slog.LevelError, Message: "falied to update rule due" + err.Error(), Details: []slog.Attr{slog.Time("time", time.Now().UTC())}}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -142,23 +147,42 @@ func (re *re) StartScheduler(ctx context.Context) error {
|
||||
Protocol: protocol,
|
||||
Created: due.Unix(),
|
||||
}
|
||||
re.errors <- re.process(ctx, rule, msg)
|
||||
re.runInfo <- re.process(ctx, rule, msg)
|
||||
}(r)
|
||||
}
|
||||
// Reset due, it will reset the page meta as well.
|
||||
due = time.Now().UTC()
|
||||
|
||||
reportConfigs, err := re.repo.ListReportsConfig(ctx, pm)
|
||||
if err != nil {
|
||||
re.errors <- err
|
||||
re.runInfo <- RunInfo{
|
||||
Level: slog.LevelError,
|
||||
Message: "fiald to list reports " + err.Error(),
|
||||
Details: []slog.Attr{slog.Time("due", due)},
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, c := range reportConfigs.ReportConfigs {
|
||||
go func(cfg ReportConfig) {
|
||||
if _, err := re.repo.UpdateReportDue(ctx, cfg.ID, cfg.Schedule.NextDue()); err != nil {
|
||||
re.errors <- err
|
||||
re.runInfo <- RunInfo{Level: slog.LevelError, Message: "falied to update report due" + err.Error(), Details: []slog.Attr{slog.Time("time", time.Now().UTC())}}
|
||||
return
|
||||
}
|
||||
re.errors <- re.processReportConfig(ctx, cfg)
|
||||
_, err := re.generateReport(ctx, cfg, EmailReport)
|
||||
info := RunInfo{
|
||||
Details: []slog.Attr{
|
||||
slog.String("domain_id", cfg.DomainID),
|
||||
slog.String("report_id", cfg.ID),
|
||||
slog.String("report_name", cfg.Name),
|
||||
slog.Time("time", time.Now().UTC()),
|
||||
},
|
||||
}
|
||||
if err != nil {
|
||||
info.Level = slog.LevelError
|
||||
info.Message = "failed to generate report" + err.Error()
|
||||
}
|
||||
re.runInfo <- info
|
||||
}(c)
|
||||
}
|
||||
}
|
||||
|
||||
+12
-5
@@ -5,6 +5,7 @@ package re
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -88,10 +89,16 @@ type Rule struct {
|
||||
Logic Script `json:"logic"`
|
||||
OutputChannel string `json:"output_channel,omitempty"`
|
||||
OutputTopic string `json:"output_topic,omitempty"`
|
||||
Schedule Schedule `json:"schedule,omitempty"`
|
||||
Schedule Schedule `json:"schedule"`
|
||||
Status Status `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at,omitempty"`
|
||||
CreatedBy string `json:"created_by,omitempty"`
|
||||
UpdatedAt time.Time `json:"updated_at,omitempty"`
|
||||
UpdatedBy string `json:"updated_by,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
CreatedBy string `json:"created_by"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
UpdatedBy string `json:"updated_by"`
|
||||
}
|
||||
|
||||
type RunInfo struct {
|
||||
Level slog.Level
|
||||
Details []slog.Attr
|
||||
Message string
|
||||
}
|
||||
|
||||
+3
-3
@@ -91,7 +91,7 @@ type Service interface {
|
||||
|
||||
type re struct {
|
||||
repo Repository
|
||||
errors chan error
|
||||
runInfo chan RunInfo
|
||||
idp supermq.IDProvider
|
||||
rePubSub messaging.PubSub
|
||||
writersPub messaging.Publisher
|
||||
@@ -101,11 +101,11 @@ type re struct {
|
||||
readers grpcReadersV1.ReadersServiceClient
|
||||
}
|
||||
|
||||
func NewService(repo Repository, errors chan (error), idp supermq.IDProvider, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, tck Ticker, emailer Emailer, readers grpcReadersV1.ReadersServiceClient) Service {
|
||||
func NewService(repo Repository, runInfo chan RunInfo, idp supermq.IDProvider, rePubSub messaging.PubSub, writersPub, alarmsPub messaging.Publisher, tck Ticker, emailer Emailer, readers grpcReadersV1.ReadersServiceClient) Service {
|
||||
return &re{
|
||||
repo: repo,
|
||||
idp: idp,
|
||||
errors: errors,
|
||||
runInfo: runInfo,
|
||||
rePubSub: rePubSub,
|
||||
writersPub: writersPub,
|
||||
alarmsPub: alarmsPub,
|
||||
|
||||
+19
-19
@@ -51,18 +51,18 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func newService(t *testing.T, errs chan error) (re.Service, *mocks.Repository, *pubsubmocks.PubSub, *mocks.Ticker) {
|
||||
func newService(t *testing.T, runInfo chan re.RunInfo) (re.Service, *mocks.Repository, *pubsubmocks.PubSub, *mocks.Ticker) {
|
||||
repo := new(mocks.Repository)
|
||||
mockTicker := new(mocks.Ticker)
|
||||
idProvider := uuid.NewMock()
|
||||
pubsub := pubsubmocks.NewPubSub(t)
|
||||
readersSvc := new(readmocks.ReadersServiceClient)
|
||||
e := new(mocks.Emailer)
|
||||
return re.NewService(repo, errs, idProvider, pubsub, pubsub, pubsub, mockTicker, e, readersSvc), repo, pubsub, mockTicker
|
||||
return re.NewService(repo, runInfo, idProvider, pubsub, pubsub, pubsub, mockTicker, e, readersSvc), repo, pubsub, mockTicker
|
||||
}
|
||||
|
||||
func TestAddRule(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
ruleName := namegen.Generate()
|
||||
now := time.Now().Add(time.Hour)
|
||||
cases := []struct {
|
||||
@@ -137,7 +137,7 @@ func TestAddRule(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestViewRule(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
now := time.Now().Add(time.Hour)
|
||||
cases := []struct {
|
||||
@@ -195,7 +195,7 @@ func TestViewRule(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdateRule(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
newName := namegen.Generate()
|
||||
now := time.Now().Add(time.Hour)
|
||||
@@ -280,7 +280,7 @@ func TestUpdateRule(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListRules(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
numRules := 50
|
||||
now := time.Now().Add(time.Hour)
|
||||
var rules []re.Rule
|
||||
@@ -385,7 +385,7 @@ func TestListRules(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemoveRule(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
@@ -425,7 +425,7 @@ func TestRemoveRule(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEnableRule(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
now := time.Now()
|
||||
|
||||
@@ -484,7 +484,7 @@ func TestEnableRule(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDisableRule(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
now := time.Now()
|
||||
|
||||
@@ -543,7 +543,7 @@ func TestDisableRule(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHandle(t *testing.T) {
|
||||
svc, repo, pubmocks, _ := newService(t, make(chan error))
|
||||
svc, repo, pubmocks, _ := newService(t, make(chan re.RunInfo))
|
||||
now := time.Now()
|
||||
empty := ""
|
||||
|
||||
@@ -627,7 +627,7 @@ func TestHandle(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAddReportConfig(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
@@ -679,7 +679,7 @@ func TestAddReportConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestViewReportConfig(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
@@ -724,7 +724,7 @@ func TestViewReportConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdateReportConfig(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
newName := namegen.Generate()
|
||||
now := time.Now().Add(time.Hour)
|
||||
@@ -787,7 +787,7 @@ func TestUpdateReportConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListReportsConfig(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
numConfigs := 50
|
||||
now := time.Now().Add(time.Hour)
|
||||
var configs []re.ReportConfig
|
||||
@@ -893,7 +893,7 @@ func TestListReportsConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemoveReportConfig(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
@@ -933,7 +933,7 @@ func TestRemoveReportConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEnableReportConfig(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
@@ -981,7 +981,7 @@ func TestEnableReportConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDisableReportConfig(t *testing.T) {
|
||||
svc, repo, _, _ := newService(t, make(chan error))
|
||||
svc, repo, _, _ := newService(t, make(chan re.RunInfo))
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
@@ -1038,8 +1038,8 @@ func TestDisableReportConfig(t *testing.T) {
|
||||
|
||||
func TestStartScheduler(t *testing.T) {
|
||||
now := time.Now().Truncate(time.Minute)
|
||||
errs := make(chan error)
|
||||
svc, repo, _, ticker := newService(t, errs)
|
||||
ri := make(chan re.RunInfo)
|
||||
svc, repo, _, ticker := newService(t, ri)
|
||||
|
||||
ctxCases := []struct {
|
||||
desc string
|
||||
|
||||
Reference in New Issue
Block a user