mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:00:25 +00:00
NOISSUE - Improve alarms and reports handling
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
+10
-7
@@ -55,6 +55,11 @@ const (
|
||||
envPrefixDomains = "SMQ_DOMAINS_GRPC_"
|
||||
)
|
||||
|
||||
// We use a buffered channel to prevent blocking, as logging is an expensive operation.
|
||||
// A larger buffer size would also work, but we’d likely need another instance of RE in that case.
|
||||
// A smaller size would probably work too, but there's no need to be that frugal with resources.
|
||||
const channBuffer = 256
|
||||
|
||||
type config struct {
|
||||
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
|
||||
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
|
||||
@@ -185,7 +190,7 @@ func main() {
|
||||
}
|
||||
defer authnClient.Close()
|
||||
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
|
||||
errs := make(chan error)
|
||||
errs := make(chan error, channBuffer)
|
||||
|
||||
domsGrpcCfg := grpcclient.Config{}
|
||||
if err := env.ParseWithOptions(&domsGrpcCfg, env.Options{Prefix: envPrefixDomains}); err != nil {
|
||||
@@ -249,14 +254,12 @@ func main() {
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
err := <-errs
|
||||
switch err {
|
||||
case nil:
|
||||
logger.Info("Handling rule completed successfully")
|
||||
default:
|
||||
for err := range errs {
|
||||
if err != nil {
|
||||
logger.Warn("Error handling rule", slog.String("error", err.Error()))
|
||||
continue
|
||||
}
|
||||
logger.Info("Handling rule completed successfully")
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
+19
-18
@@ -140,20 +140,21 @@ func (re *re) StartScheduler(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, rule := range page.Rules {
|
||||
go func(r Rule) {
|
||||
for _, r := range page.Rules {
|
||||
go func(rule Rule) {
|
||||
if _, err := re.repo.UpdateRuleDue(ctx, rule.ID, rule.Schedule.NextDue()); err != nil {
|
||||
re.errors <- err
|
||||
return
|
||||
}
|
||||
|
||||
msg := &messaging.Message{
|
||||
Channel: r.InputChannel,
|
||||
Subtopic: r.InputTopic,
|
||||
Channel: rule.InputChannel,
|
||||
Subtopic: rule.InputTopic,
|
||||
Protocol: protocol,
|
||||
Created: due.Unix(),
|
||||
}
|
||||
re.errors <- re.process(ctx, r, msg)
|
||||
}(rule)
|
||||
if _, err := re.repo.UpdateRuleDue(ctx, rule.ID, rule.Schedule.NextDue()); err != nil {
|
||||
re.errors <- err
|
||||
continue
|
||||
}
|
||||
re.errors <- re.process(ctx, rule, msg)
|
||||
}(r)
|
||||
}
|
||||
|
||||
reportConfigs, err := re.repo.ListReportsConfig(ctx, pm)
|
||||
@@ -162,14 +163,14 @@ func (re *re) StartScheduler(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, cfg := range reportConfigs.ReportConfigs {
|
||||
go func(config ReportConfig) {
|
||||
re.errors <- re.processReportConfig(ctx, config)
|
||||
}(cfg)
|
||||
if _, err := re.repo.UpdateReportDue(ctx, cfg.ID, cfg.Schedule.NextDue()); err != nil {
|
||||
re.errors <- err
|
||||
continue
|
||||
}
|
||||
for _, c := range reportConfigs.ReportConfigs {
|
||||
go func(cfg ReportConfig) {
|
||||
if _, err := re.repo.UpdateReportDue(ctx, cfg.ID, cfg.Schedule.NextDue()); err != nil {
|
||||
re.errors <- err
|
||||
return
|
||||
}
|
||||
re.errors <- re.processReportConfig(ctx, cfg)
|
||||
}(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user