mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 07:00:25 +00:00
NOISSUE - Refactor time parsing for scheduler (#57)
* use unix time for the scheduler Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * refactor time field Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * use constants Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * change time format Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * add logging Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix linter error Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix linter error Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * update decodeUpdateRuleRequest Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * address comments Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix wording Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * fix wording Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> * add missing words Signed-off-by: nyagamunene <stevenyaga2014@gmail.com> --------- Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
This commit is contained in:
@@ -16,6 +16,7 @@ import (
|
||||
chclient "github.com/absmach/callhome/pkg/client"
|
||||
"github.com/absmach/magistrala/re"
|
||||
httpapi "github.com/absmach/magistrala/re/api"
|
||||
"github.com/absmach/magistrala/re/middleware"
|
||||
repg "github.com/absmach/magistrala/re/postgres"
|
||||
"github.com/absmach/supermq"
|
||||
"github.com/absmach/supermq/consumers"
|
||||
@@ -207,6 +208,7 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, auth
|
||||
|
||||
// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
|
||||
csvc := re.NewService(repo, idp, nil, re.NewTicker(time.Minute))
|
||||
csvc = middleware.LoggingMiddleware(csvc, logger)
|
||||
|
||||
return csvc, nil
|
||||
}
|
||||
|
||||
+1
-1
@@ -53,7 +53,7 @@ func (req listRulesReq) validate() error {
|
||||
}
|
||||
|
||||
type updateRuleReq struct {
|
||||
Rule re.Rule `json:",inline"`
|
||||
Rule re.Rule
|
||||
}
|
||||
|
||||
func (req updateRuleReq) validate() error {
|
||||
|
||||
+4
-3
@@ -59,7 +59,7 @@ func MakeHandler(svc re.Service, authn mgauthn.Authentication, logger *slog.Logg
|
||||
opts...,
|
||||
), "list_rules").ServeHTTP)
|
||||
|
||||
r.Put("/{ruleID}", otelhttp.NewHandler(kithttp.NewServer(
|
||||
r.Patch("/{ruleID}", otelhttp.NewHandler(kithttp.NewServer(
|
||||
updateRuleEndpoint(svc),
|
||||
decodeUpdateRuleRequest,
|
||||
api.EncodeResponse,
|
||||
@@ -73,14 +73,14 @@ func MakeHandler(svc re.Service, authn mgauthn.Authentication, logger *slog.Logg
|
||||
opts...,
|
||||
), "delete_rule").ServeHTTP)
|
||||
|
||||
r.Put("/{ruleID}/enable", otelhttp.NewHandler(kithttp.NewServer(
|
||||
r.Post("/{ruleID}/enable", otelhttp.NewHandler(kithttp.NewServer(
|
||||
enableRuleEndpoint(svc),
|
||||
decodeUpdateRuleStatusRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "enable_rule").ServeHTTP)
|
||||
|
||||
r.Put("/{ruleID}/disable", otelhttp.NewHandler(kithttp.NewServer(
|
||||
r.Post("/{ruleID}/disable", otelhttp.NewHandler(kithttp.NewServer(
|
||||
disableRuleEndpoint(svc),
|
||||
decodeUpdateRuleStatusRequest,
|
||||
api.EncodeResponse,
|
||||
@@ -116,6 +116,7 @@ func decodeUpdateRuleRequest(_ context.Context, r *http.Request) (interface{}, e
|
||||
if err := json.NewDecoder(r.Body).Decode(&rule); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rule.ID = chi.URLParam(r, idKey)
|
||||
return updateRuleReq{Rule: rule}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,197 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/re"
|
||||
"github.com/absmach/supermq/pkg/authn"
|
||||
"github.com/absmach/supermq/pkg/messaging"
|
||||
)
|
||||
|
||||
var _ re.Service = (*loggingMiddleware)(nil)
|
||||
|
||||
type loggingMiddleware struct {
|
||||
logger *slog.Logger
|
||||
svc re.Service
|
||||
}
|
||||
|
||||
func LoggingMiddleware(svc re.Service, logger *slog.Logger) re.Service {
|
||||
return &loggingMiddleware{logger, svc}
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) AddRule(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
slog.String("domain_id", session.DomainID),
|
||||
slog.String("rule_name", r.Name),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("Add rule failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("Add rule completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.AddRule(ctx, session, r)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ViewRule(ctx context.Context, session authn.Session, id string) (res re.Rule, err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
slog.String("domain_id", session.DomainID),
|
||||
slog.Group("rule",
|
||||
slog.String("id", res.ID),
|
||||
slog.String("name", res.Name),
|
||||
),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("View rule failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("View rule completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.ViewRule(ctx, session, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) UpdateRule(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
slog.String("domain_id", session.DomainID),
|
||||
slog.Group("rule",
|
||||
slog.String("id", r.ID),
|
||||
slog.String("name", r.Name),
|
||||
),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("Update rule failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("Update rule completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.UpdateRule(ctx, session, r)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ListRules(ctx context.Context, session authn.Session, pm re.PageMeta) (pg re.Page, err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
slog.String("domain_id", session.DomainID),
|
||||
slog.Group("page",
|
||||
slog.Uint64("offset", pm.Offset),
|
||||
slog.Uint64("limit", pm.Limit),
|
||||
slog.Uint64("total", pg.Total),
|
||||
),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("List rules failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("List rules completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.ListRules(ctx, session, pm)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) RemoveRule(ctx context.Context, session authn.Session, id string) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
slog.String("domain_id", session.DomainID),
|
||||
slog.String("rule_id", id),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("Remove rule failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("Remove rule completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.RemoveRule(ctx, session, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) EnableRule(ctx context.Context, session authn.Session, id string) (res re.Rule, err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
slog.String("domain_id", session.DomainID),
|
||||
slog.Group("rule",
|
||||
slog.String("id", res.ID),
|
||||
slog.String("name", res.Name),
|
||||
),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("Enable rule failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("Enable rule completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.EnableRule(ctx, session, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) DisableRule(ctx context.Context, session authn.Session, id string) (res re.Rule, err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
slog.String("domain_id", session.DomainID),
|
||||
slog.Group("rule",
|
||||
slog.String("id", res.ID),
|
||||
slog.String("name", res.Name),
|
||||
),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("Disable rule failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("Disable rule completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.DisableRule(ctx, session, id)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) StartScheduler(ctx context.Context) (err error) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
}
|
||||
if err != nil {
|
||||
args = append(args, slog.String("error", err.Error()))
|
||||
lm.logger.Warn("Start scheduler failed", args...)
|
||||
return
|
||||
}
|
||||
lm.logger.Info("Start scheduler completed successfully", args...)
|
||||
}(time.Now())
|
||||
return lm.svc.StartScheduler(ctx)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) ConsumeAsync(ctx context.Context, msgs interface{}) {
|
||||
defer func(begin time.Time) {
|
||||
args := []any{
|
||||
slog.String("duration", time.Since(begin).String()),
|
||||
}
|
||||
if m, ok := msgs.(*messaging.Message); ok {
|
||||
args = append(args,
|
||||
slog.String("channel", m.Channel),
|
||||
slog.String("payload_size", fmt.Sprintf("%d", len(m.Payload))),
|
||||
)
|
||||
}
|
||||
lm.logger.Info("Message consumption completed", args...)
|
||||
}(time.Now())
|
||||
|
||||
lm.svc.ConsumeAsync(ctx, msgs)
|
||||
}
|
||||
|
||||
func (lm *loggingMiddleware) Errors() <-chan error {
|
||||
return lm.svc.Errors()
|
||||
}
|
||||
+7
-9
@@ -22,8 +22,8 @@ func (s Schedule) MarshalJSON() ([]byte, error) {
|
||||
Time string `json:"time"`
|
||||
*Alias
|
||||
}{
|
||||
StartDateTime: s.StartDateTime.Format(timeFormat),
|
||||
Time: s.Time.Format(timeFormat),
|
||||
StartDateTime: s.StartDateTime.Format(time.RFC3339),
|
||||
Time: s.Time.Format(time.RFC3339),
|
||||
Alias: (*Alias)(&s),
|
||||
}
|
||||
return json.Marshal(jTimes)
|
||||
@@ -42,16 +42,14 @@ func (s *Schedule) UnmarshalJSON(data []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if aux.StartDateTime != "" {
|
||||
startDateTime, err := time.Parse(timeFormat, aux.StartDateTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.StartDateTime = startDateTime
|
||||
startDateTime, err := time.Parse(time.RFC3339, aux.StartDateTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.StartDateTime = startDateTime
|
||||
|
||||
if aux.Time != "" {
|
||||
time, err := time.Parse(timeFormat, aux.Time)
|
||||
time, err := time.Parse(time.RFC3339, aux.Time)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
+2
-2
@@ -17,7 +17,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
timeFormat = "2006-01-02T15:04"
|
||||
hoursInDay = 24
|
||||
daysInWeek = 7
|
||||
monthsInYear = 12
|
||||
@@ -284,7 +283,8 @@ func (r Rule) shouldRun(startTime time.Time) bool {
|
||||
}
|
||||
|
||||
t := r.Schedule.Time.Truncate(time.Minute)
|
||||
if t.Equal(startTime) {
|
||||
startTimeOnly := time.Date(0, 1, 1, startTime.Hour(), startTime.Minute(), 0, 0, time.UTC)
|
||||
if t.Equal(startTimeOnly) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user