mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 06:40:19 +00:00
NOISSUE - Fix RE schedule (#234)
* NOISSUE - Fix RE schedule Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Add domain to the scheduled message Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Remove debug log Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> --------- Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
+16
-10
@@ -20,6 +20,7 @@ type Recurring uint
|
||||
|
||||
const (
|
||||
None Recurring = iota
|
||||
Hourly
|
||||
Daily
|
||||
Weekly
|
||||
Monthly
|
||||
@@ -27,6 +28,8 @@ const (
|
||||
|
||||
func (rt Recurring) String() string {
|
||||
switch rt {
|
||||
case Hourly:
|
||||
return "hourly"
|
||||
case Daily:
|
||||
return "daily"
|
||||
case Weekly:
|
||||
@@ -49,6 +52,8 @@ func (rt *Recurring) UnmarshalJSON(data []byte) error {
|
||||
}
|
||||
|
||||
switch s {
|
||||
case "hourly":
|
||||
*rt = Hourly
|
||||
case "daily":
|
||||
*rt = Daily
|
||||
case "weekly":
|
||||
@@ -64,14 +69,14 @@ func (rt *Recurring) UnmarshalJSON(data []byte) error {
|
||||
}
|
||||
|
||||
type Schedule struct {
|
||||
StartDateTime *time.Time `json:"start_datetime"` // When the schedule becomes active
|
||||
Time time.Time `json:"time"` // Specific time for the rule to run
|
||||
Recurring Recurring `json:"recurring"` // None, Daily, Weekly, Monthly
|
||||
RecurringPeriod uint `json:"recurring_period"` // Controls how many intervals to skip between executions: 1 = every interval, 2 = every second interval, etc.
|
||||
StartDateTime time.Time `json:"start_datetime,omitempty"` // When the schedule becomes active
|
||||
Time time.Time `json:"time,omitempty"` // Specific time for the rule to run
|
||||
Recurring Recurring `json:"recurring,omitempty"` // None, Daily, Weekly, Monthly
|
||||
RecurringPeriod uint `json:"recurring_period,omitempty"` // Controls how many intervals to skip between executions: 1 = every interval, 2 = every second interval, etc.
|
||||
}
|
||||
|
||||
func (s Schedule) Validate() error {
|
||||
if s.StartDateTime != nil {
|
||||
if !s.StartDateTime.IsZero() {
|
||||
now := time.Now().UTC()
|
||||
if s.StartDateTime.Before(now) {
|
||||
return ErrStartDateTimeInPast
|
||||
@@ -90,7 +95,7 @@ func (s Schedule) MarshalJSON() ([]byte, error) {
|
||||
Time: s.Time.Format(time.RFC3339),
|
||||
Alias: (*Alias)(&s),
|
||||
}
|
||||
if s.StartDateTime != nil {
|
||||
if !s.StartDateTime.IsZero() {
|
||||
formatted := s.StartDateTime.Format(time.RFC3339)
|
||||
jTimes.StartDateTime = &formatted
|
||||
}
|
||||
@@ -101,8 +106,8 @@ func (s Schedule) MarshalJSON() ([]byte, error) {
|
||||
func (s *Schedule) UnmarshalJSON(data []byte) error {
|
||||
type Alias Schedule
|
||||
temp := struct {
|
||||
StartDateTime string `json:"start_datetime"`
|
||||
Time string `json:"time"`
|
||||
StartDateTime string `json:"start_datetime,omitempty"`
|
||||
Time string `json:"time,omitempty"`
|
||||
*Alias
|
||||
}{
|
||||
Alias: (*Alias)(s),
|
||||
@@ -111,13 +116,12 @@ func (s *Schedule) UnmarshalJSON(data []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.StartDateTime = nil
|
||||
if temp.StartDateTime != "" {
|
||||
startDateTime, err := time.Parse(time.RFC3339, temp.StartDateTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.StartDateTime = &startDateTime
|
||||
s.StartDateTime = startDateTime
|
||||
}
|
||||
if temp.Time != "" {
|
||||
parsedTime, err := time.Parse(time.RFC3339, temp.Time)
|
||||
@@ -131,6 +135,8 @@ func (s *Schedule) UnmarshalJSON(data []byte) error {
|
||||
|
||||
func (s Schedule) NextDue() time.Time {
|
||||
switch s.Recurring {
|
||||
case Hourly:
|
||||
return s.Time.Add(time.Hour * time.Duration(s.RecurringPeriod))
|
||||
case Daily:
|
||||
return s.Time.AddDate(0, 0, int(s.RecurringPeriod))
|
||||
case Weekly:
|
||||
|
||||
@@ -43,7 +43,7 @@ var (
|
||||
now = time.Now().UTC().Truncate(time.Minute)
|
||||
future = now.Add(1 * time.Hour)
|
||||
schedule = pkgSch.Schedule{
|
||||
StartDateTime: &future,
|
||||
StartDateTime: future,
|
||||
Recurring: pkgSch.Daily,
|
||||
RecurringPeriod: 1,
|
||||
Time: now,
|
||||
@@ -59,7 +59,7 @@ var (
|
||||
}
|
||||
past = now.Add(-1 * time.Hour)
|
||||
scheduleInPast = pkgSch.Schedule{
|
||||
StartDateTime: &past,
|
||||
StartDateTime: past,
|
||||
Recurring: pkgSch.Daily,
|
||||
RecurringPeriod: 1,
|
||||
Time: past,
|
||||
@@ -542,6 +542,18 @@ func TestUpdateRulesEndpoint(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
updateRuleReq := re.Rule{
|
||||
ID: rule.ID,
|
||||
Name: rule.Name,
|
||||
Logic: re.Script{
|
||||
Type: re.ScriptType(0),
|
||||
Value: "return `test` end",
|
||||
},
|
||||
InputChannel: testsutil.GenerateUUID(&testing.T{}),
|
||||
Metadata: map[string]any{
|
||||
"name": "test",
|
||||
},
|
||||
}
|
||||
updateNoInput := re.Rule{
|
||||
ID: rule.ID,
|
||||
Name: rule.Name,
|
||||
Logic: re.Script{
|
||||
@@ -552,7 +564,6 @@ func TestUpdateRulesEndpoint(t *testing.T) {
|
||||
"name": "test",
|
||||
},
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
desc string
|
||||
token string
|
||||
@@ -578,6 +589,17 @@ func TestUpdateRulesEndpoint(t *testing.T) {
|
||||
status: http.StatusOK,
|
||||
err: nil,
|
||||
},
|
||||
{
|
||||
desc: "update rule with no input channel or schedule",
|
||||
token: validToken,
|
||||
domainID: domainID,
|
||||
id: rule.ID,
|
||||
updateReq: updateNoInput,
|
||||
contentType: contentType,
|
||||
svcResp: rule,
|
||||
status: http.StatusBadRequest,
|
||||
err: apiutil.ErrValidation,
|
||||
},
|
||||
{
|
||||
desc: "update rule with invalid token",
|
||||
token: invalidToken,
|
||||
|
||||
@@ -17,6 +17,8 @@ const (
|
||||
MaxTitleSize = 37
|
||||
)
|
||||
|
||||
var errEmptyTrigger = errors.New("rule does not have input channel or schedule")
|
||||
|
||||
type addRuleReq struct {
|
||||
re.Rule
|
||||
}
|
||||
@@ -69,6 +71,12 @@ func (req updateRuleReq) validate() error {
|
||||
if len(req.Rule.Name) > api.MaxNameSize {
|
||||
return apiutil.ErrNameSize
|
||||
}
|
||||
if err := req.Rule.Schedule.Validate(); err != nil {
|
||||
return errors.Wrap(err, apiutil.ErrValidation)
|
||||
}
|
||||
if req.Rule.InputChannel == "" && req.Rule.Schedule.StartDateTime.IsZero() {
|
||||
return errors.Wrap(errEmptyTrigger, apiutil.ErrValidation)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -149,6 +149,7 @@ func (re *re) StartScheduler(ctx context.Context) error {
|
||||
}
|
||||
|
||||
msg := &messaging.Message{
|
||||
Domain: rule.DomainID,
|
||||
Channel: rule.InputChannel,
|
||||
Subtopic: rule.InputTopic,
|
||||
Protocol: protocol,
|
||||
|
||||
@@ -100,12 +100,8 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R
|
||||
if r.Metadata != nil {
|
||||
query = append(query, "metadata = :metadata,")
|
||||
}
|
||||
if r.InputChannel != "" {
|
||||
query = append(query, "input_channel = :input_channel,")
|
||||
}
|
||||
if r.InputTopic != "" {
|
||||
query = append(query, "input_topic = :input_topic,")
|
||||
}
|
||||
query = append(query, "input_channel = :input_channel,")
|
||||
query = append(query, "input_topic = :input_topic,")
|
||||
if r.Outputs != nil {
|
||||
query = append(query, "outputs = :outputs, ")
|
||||
}
|
||||
|
||||
+3
-4
@@ -47,9 +47,8 @@ func ruleToDb(r re.Rule) (dbRule, error) {
|
||||
metadata = b
|
||||
}
|
||||
|
||||
start := sql.NullTime{}
|
||||
if r.Schedule.StartDateTime != nil && !r.Schedule.StartDateTime.IsZero() {
|
||||
start.Time = *r.Schedule.StartDateTime
|
||||
start := sql.NullTime{Time: r.Schedule.StartDateTime}
|
||||
if !r.Schedule.StartDateTime.IsZero() {
|
||||
start.Valid = true
|
||||
}
|
||||
t := sql.NullTime{Time: r.Schedule.Time}
|
||||
@@ -123,7 +122,7 @@ func dbToRule(dto dbRule) (re.Rule, error) {
|
||||
},
|
||||
Outputs: outputs,
|
||||
Schedule: schedule.Schedule{
|
||||
StartDateTime: &dto.StartDateTime.Time,
|
||||
StartDateTime: dto.StartDateTime.Time,
|
||||
Time: dto.Time.Time,
|
||||
Recurring: dto.Recurring,
|
||||
RecurringPeriod: dto.RecurringPeriod,
|
||||
|
||||
+1
-1
@@ -50,7 +50,7 @@ type Rule struct {
|
||||
InputTopic string `json:"input_topic"`
|
||||
Logic Script `json:"logic"`
|
||||
Outputs Outputs `json:"outputs,omitempty"`
|
||||
Schedule schedule.Schedule `json:"schedule"`
|
||||
Schedule schedule.Schedule `json:"schedule,omitempty"`
|
||||
Status Status `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
CreatedBy string `json:"created_by"`
|
||||
|
||||
+3
-4
@@ -56,10 +56,10 @@ func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (Rule,
|
||||
r.DomainID = session.DomainID
|
||||
r.Status = EnabledStatus
|
||||
|
||||
if r.Schedule.StartDateTime == nil || r.Schedule.StartDateTime.IsZero() {
|
||||
r.Schedule.StartDateTime = &now
|
||||
if !r.Schedule.StartDateTime.IsZero() {
|
||||
r.Schedule.StartDateTime = now
|
||||
}
|
||||
r.Schedule.Time = *r.Schedule.StartDateTime
|
||||
r.Schedule.Time = r.Schedule.StartDateTime
|
||||
|
||||
rule, err := re.repo.AddRule(ctx, r)
|
||||
if err != nil {
|
||||
@@ -103,7 +103,6 @@ func (re *re) UpdateRuleTags(ctx context.Context, session authn.Session, r Rule)
|
||||
func (re *re) UpdateRuleSchedule(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
|
||||
r.UpdatedAt = time.Now().UTC()
|
||||
r.UpdatedBy = session.UserID
|
||||
r.Schedule.Time = *r.Schedule.StartDateTime
|
||||
rule, err := re.repo.UpdateRuleSchedule(ctx, r)
|
||||
if err != nil {
|
||||
return Rule{}, errors.Wrap(svcerr.ErrUpdateEntity, err)
|
||||
|
||||
+2
-2
@@ -38,7 +38,7 @@ var (
|
||||
inputChannel = "test.channel"
|
||||
StartDateTime = time.Now().Add(-time.Hour)
|
||||
schedule = pkgSch.Schedule{
|
||||
StartDateTime: &StartDateTime,
|
||||
StartDateTime: StartDateTime,
|
||||
Recurring: pkgSch.Daily,
|
||||
RecurringPeriod: 1,
|
||||
Time: time.Now().Add(-time.Hour),
|
||||
@@ -346,7 +346,7 @@ func TestListRules(t *testing.T) {
|
||||
Recurring: pkgSch.Daily,
|
||||
Time: now.Add(1 * time.Hour),
|
||||
RecurringPeriod: 1,
|
||||
StartDateTime: &now,
|
||||
StartDateTime: now,
|
||||
},
|
||||
}
|
||||
rules = append(rules, r)
|
||||
|
||||
@@ -43,7 +43,7 @@ var (
|
||||
now = time.Now().UTC().Truncate(time.Minute)
|
||||
future = now.Add(1 * time.Hour)
|
||||
schedule = pkgSch.Schedule{
|
||||
StartDateTime: &future,
|
||||
StartDateTime: future,
|
||||
Recurring: pkgSch.Daily,
|
||||
RecurringPeriod: 1,
|
||||
Time: future,
|
||||
@@ -127,7 +127,7 @@ func TestAddReportConfigEndpoint(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
scheduleInPast := pkgSch.Schedule{
|
||||
StartDateTime: &now,
|
||||
StartDateTime: now,
|
||||
Recurring: pkgSch.Daily,
|
||||
RecurringPeriod: 1,
|
||||
Time: now,
|
||||
|
||||
@@ -60,9 +60,8 @@ func reportToDb(r reports.ReportConfig) (dbReport, error) {
|
||||
}
|
||||
email = e
|
||||
}
|
||||
start := sql.NullTime{}
|
||||
if r.Schedule.StartDateTime != nil && !r.Schedule.StartDateTime.IsZero() {
|
||||
start.Time = *r.Schedule.StartDateTime
|
||||
start := sql.NullTime{Time: r.Schedule.StartDateTime}
|
||||
if !r.Schedule.StartDateTime.IsZero() {
|
||||
start.Valid = true
|
||||
}
|
||||
t := sql.NullTime{Time: r.Schedule.Time}
|
||||
@@ -120,7 +119,7 @@ func dbToReport(dto dbReport) (reports.ReportConfig, error) {
|
||||
Config: &config,
|
||||
Metrics: metrics,
|
||||
Schedule: schedule.Schedule{
|
||||
StartDateTime: &dto.StartDateTime.Time,
|
||||
StartDateTime: dto.StartDateTime.Time,
|
||||
Time: dto.Due.Time,
|
||||
Recurring: dto.Recurring,
|
||||
RecurringPeriod: dto.RecurringPeriod,
|
||||
|
||||
+4
-4
@@ -56,10 +56,10 @@ func (r *report) AddReportConfig(ctx context.Context, session authn.Session, cfg
|
||||
cfg.DomainID = session.DomainID
|
||||
cfg.Status = EnabledStatus
|
||||
|
||||
if cfg.Schedule.StartDateTime == nil || cfg.Schedule.StartDateTime.IsZero() {
|
||||
cfg.Schedule.StartDateTime = &now
|
||||
if cfg.Schedule.StartDateTime.IsZero() {
|
||||
cfg.Schedule.StartDateTime = now
|
||||
}
|
||||
cfg.Schedule.Time = *cfg.Schedule.StartDateTime
|
||||
cfg.Schedule.Time = cfg.Schedule.StartDateTime
|
||||
|
||||
reportConfig, err := r.repo.AddReportConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
@@ -92,7 +92,7 @@ func (r *report) UpdateReportConfig(ctx context.Context, session authn.Session,
|
||||
func (r *report) UpdateReportSchedule(ctx context.Context, session authn.Session, cfg ReportConfig) (ReportConfig, error) {
|
||||
cfg.UpdatedAt = time.Now().UTC()
|
||||
cfg.UpdatedBy = session.UserID
|
||||
cfg.Schedule.Time = *cfg.Schedule.StartDateTime
|
||||
cfg.Schedule.Time = cfg.Schedule.StartDateTime
|
||||
c, err := r.repo.UpdateReportSchedule(ctx, cfg)
|
||||
if err != nil {
|
||||
return ReportConfig{}, errors.Wrap(svcerr.ErrUpdateEntity, err)
|
||||
|
||||
@@ -32,7 +32,7 @@ var (
|
||||
domainID = testsutil.GenerateUUID(&testing.T{})
|
||||
now = time.Now().UTC()
|
||||
schedule = pkgSch.Schedule{
|
||||
StartDateTime: &now,
|
||||
StartDateTime: now,
|
||||
Recurring: pkgSch.Daily,
|
||||
RecurringPeriod: 1,
|
||||
Time: time.Now().Add(-time.Hour),
|
||||
|
||||
Reference in New Issue
Block a user