MG-36 - Global ticker for scheduled rules (#42)

* initial implementation

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update main.go file

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update Reccuring Type method

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* add marshalling and unmrshalling for times

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix failing linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix postgres storage

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* add tests for schedular

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix failing linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix linter

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* refactor should run

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* refactor how rule are ran

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* rename method

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

---------

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
This commit is contained in:
Steve Munene
2025-01-24 17:01:36 +03:00
committed by GitHub
parent 1f57031ea7
commit 1b6b7f558c
14 changed files with 995 additions and 92 deletions
+1 -1
View File
@@ -66,7 +66,7 @@ define make_docker_dev
-f docker/Dockerfile.dev ./build
endef
ADDON_SERVICES = bootstrap journal provision certs timescale-reader timescale-writer postgres-reader postgres-writer
ADDON_SERVICES = bootstrap journal provision certs timescale-reader timescale-writer postgres-reader postgres-writer re
EXTERNAL_SERVICES = vault prometheus
+6 -1
View File
@@ -181,6 +181,11 @@ func main() {
go chc.CallHome(ctx)
}
// Start scheduler
g.Go(func() error {
return svc.StartScheduler(ctx)
})
// Start all servers
g.Go(func() error {
return httpSvc.Start()
@@ -201,7 +206,7 @@ func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, auth
idp := uuid.New()
// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
csvc := re.NewService(repo, idp, nil)
csvc := re.NewService(repo, idp, nil, re.NewTicker(time.Minute))
return csvc, nil
}
+1 -1
View File
@@ -343,7 +343,7 @@ MG_RE_DB_HOST=re-db
MG_RE_DB_PORT=5432
MG_RE_DB_USER=magistrala
MG_RE_DB_PASS=magistrala
MG_RE_DB_NAME=rule_engine
MG_RE_DB_NAME=rules_engine
MG_RE_DB_SSL_MODE=disable
MG_RE_DB_SSL_CERT=
MG_RE_DB_SSL_KEY=
+1 -1
View File
@@ -101,7 +101,7 @@ func decodeAddRuleRequest(_ context.Context, r *http.Request) (interface{}, erro
}
var rule re.Rule
if err := json.NewDecoder(r.Body).Decode(&rule); err != nil {
return nil, err
return nil, errors.Wrap(err, apiutil.ErrValidation)
}
return addRuleReq{Rule: rule}, nil
}
+189
View File
@@ -0,0 +1,189 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Abstract Machines
package mocks
import (
context "context"
re "github.com/absmach/magistrala/re"
mock "github.com/stretchr/testify/mock"
)
// Repository is an autogenerated mock type for the Repository type
type Repository struct {
mock.Mock
}
// AddRule provides a mock function with given fields: ctx, r
func (_m *Repository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) {
ret := _m.Called(ctx, r)
if len(ret) == 0 {
panic("no return value specified for AddRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, re.Rule) (re.Rule, error)); ok {
return rf(ctx, r)
}
if rf, ok := ret.Get(0).(func(context.Context, re.Rule) re.Rule); ok {
r0 = rf(ctx, r)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, re.Rule) error); ok {
r1 = rf(ctx, r)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ListRules provides a mock function with given fields: ctx, pm
func (_m *Repository) ListRules(ctx context.Context, pm re.PageMeta) (re.Page, error) {
ret := _m.Called(ctx, pm)
if len(ret) == 0 {
panic("no return value specified for ListRules")
}
var r0 re.Page
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, re.PageMeta) (re.Page, error)); ok {
return rf(ctx, pm)
}
if rf, ok := ret.Get(0).(func(context.Context, re.PageMeta) re.Page); ok {
r0 = rf(ctx, pm)
} else {
r0 = ret.Get(0).(re.Page)
}
if rf, ok := ret.Get(1).(func(context.Context, re.PageMeta) error); ok {
r1 = rf(ctx, pm)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RemoveRule provides a mock function with given fields: ctx, id
func (_m *Repository) RemoveRule(ctx context.Context, id string) error {
ret := _m.Called(ctx, id)
if len(ret) == 0 {
panic("no return value specified for RemoveRule")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string) error); ok {
r0 = rf(ctx, id)
} else {
r0 = ret.Error(0)
}
return r0
}
// UpdateRule provides a mock function with given fields: ctx, r
func (_m *Repository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) {
ret := _m.Called(ctx, r)
if len(ret) == 0 {
panic("no return value specified for UpdateRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, re.Rule) (re.Rule, error)); ok {
return rf(ctx, r)
}
if rf, ok := ret.Get(0).(func(context.Context, re.Rule) re.Rule); ok {
r0 = rf(ctx, r)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, re.Rule) error); ok {
r1 = rf(ctx, r)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// UpdateRuleStatus provides a mock function with given fields: ctx, id, status
func (_m *Repository) UpdateRuleStatus(ctx context.Context, id string, status re.Status) (re.Rule, error) {
ret := _m.Called(ctx, id, status)
if len(ret) == 0 {
panic("no return value specified for UpdateRuleStatus")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, re.Status) (re.Rule, error)); ok {
return rf(ctx, id, status)
}
if rf, ok := ret.Get(0).(func(context.Context, string, re.Status) re.Rule); ok {
r0 = rf(ctx, id, status)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, string, re.Status) error); ok {
r1 = rf(ctx, id, status)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ViewRule provides a mock function with given fields: ctx, id
func (_m *Repository) ViewRule(ctx context.Context, id string) (re.Rule, error) {
ret := _m.Called(ctx, id)
if len(ret) == 0 {
panic("no return value specified for ViewRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (re.Rule, error)); ok {
return rf(ctx, id)
}
if rf, ok := ret.Get(0).(func(context.Context, string) re.Rule); ok {
r0 = rf(ctx, id)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewRepository creates a new instance of Repository. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewRepository(t interface {
mock.TestingT
Cleanup(func())
}) *Repository {
mock := &Repository{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+263
View File
@@ -0,0 +1,263 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Abstract Machines
package mocks
import (
context "context"
authn "github.com/absmach/supermq/pkg/authn"
mock "github.com/stretchr/testify/mock"
re "github.com/absmach/magistrala/re"
)
// Service is an autogenerated mock type for the Service type
type Service struct {
mock.Mock
}
// AddRule provides a mock function with given fields: ctx, session, r
func (_m *Service) AddRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
ret := _m.Called(ctx, session, r)
if len(ret) == 0 {
panic("no return value specified for AddRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok {
return rf(ctx, session, r)
}
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok {
r0 = rf(ctx, session, r)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok {
r1 = rf(ctx, session, r)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ConsumeAsync provides a mock function with given fields: ctx, messages
func (_m *Service) ConsumeAsync(ctx context.Context, messages interface{}) {
_m.Called(ctx, messages)
}
// DisableRule provides a mock function with given fields: ctx, session, id
func (_m *Service) DisableRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) {
ret := _m.Called(ctx, session, id)
if len(ret) == 0 {
panic("no return value specified for DisableRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) (re.Rule, error)); ok {
return rf(ctx, session, id)
}
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) re.Rule); ok {
r0 = rf(ctx, session, id)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, authn.Session, string) error); ok {
r1 = rf(ctx, session, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// EnableRule provides a mock function with given fields: ctx, session, id
func (_m *Service) EnableRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) {
ret := _m.Called(ctx, session, id)
if len(ret) == 0 {
panic("no return value specified for EnableRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) (re.Rule, error)); ok {
return rf(ctx, session, id)
}
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) re.Rule); ok {
r0 = rf(ctx, session, id)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, authn.Session, string) error); ok {
r1 = rf(ctx, session, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Errors provides a mock function with given fields:
func (_m *Service) Errors() <-chan error {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Errors")
}
var r0 <-chan error
if rf, ok := ret.Get(0).(func() <-chan error); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan error)
}
}
return r0
}
// ListRules provides a mock function with given fields: ctx, session, pm
func (_m *Service) ListRules(ctx context.Context, session authn.Session, pm re.PageMeta) (re.Page, error) {
ret := _m.Called(ctx, session, pm)
if len(ret) == 0 {
panic("no return value specified for ListRules")
}
var r0 re.Page
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.PageMeta) (re.Page, error)); ok {
return rf(ctx, session, pm)
}
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.PageMeta) re.Page); ok {
r0 = rf(ctx, session, pm)
} else {
r0 = ret.Get(0).(re.Page)
}
if rf, ok := ret.Get(1).(func(context.Context, authn.Session, re.PageMeta) error); ok {
r1 = rf(ctx, session, pm)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// RemoveRule provides a mock function with given fields: ctx, session, id
func (_m *Service) RemoveRule(ctx context.Context, session authn.Session, id string) error {
ret := _m.Called(ctx, session, id)
if len(ret) == 0 {
panic("no return value specified for RemoveRule")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) error); ok {
r0 = rf(ctx, session, id)
} else {
r0 = ret.Error(0)
}
return r0
}
// StartScheduler provides a mock function with given fields: ctx
func (_m *Service) StartScheduler(ctx context.Context) error {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for StartScheduler")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}
return r0
}
// UpdateRule provides a mock function with given fields: ctx, session, r
func (_m *Service) UpdateRule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
ret := _m.Called(ctx, session, r)
if len(ret) == 0 {
panic("no return value specified for UpdateRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok {
return rf(ctx, session, r)
}
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok {
r0 = rf(ctx, session, r)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok {
r1 = rf(ctx, session, r)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ViewRule provides a mock function with given fields: ctx, session, id
func (_m *Service) ViewRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) {
ret := _m.Called(ctx, session, id)
if len(ret) == 0 {
panic("no return value specified for ViewRule")
}
var r0 re.Rule
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) (re.Rule, error)); ok {
return rf(ctx, session, id)
}
if rf, ok := ret.Get(0).(func(context.Context, authn.Session, string) re.Rule); ok {
r0 = rf(ctx, session, id)
} else {
r0 = ret.Get(0).(re.Rule)
}
if rf, ok := ret.Get(1).(func(context.Context, authn.Session, string) error); ok {
r1 = rf(ctx, session, id)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewService creates a new instance of Service. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewService(t interface {
mock.TestingT
Cleanup(func())
}) *Service {
mock := &Service{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+55
View File
@@ -0,0 +1,55 @@
// Code generated by mockery v2.43.2. DO NOT EDIT.
// Copyright (c) Abstract Machines
package mocks
import (
mock "github.com/stretchr/testify/mock"
time "time"
)
// Ticker is an autogenerated mock type for the Ticker type
type Ticker struct {
mock.Mock
}
// Stop provides a mock function with given fields:
func (_m *Ticker) Stop() {
_m.Called()
}
// Tick provides a mock function with given fields:
func (_m *Ticker) Tick() <-chan time.Time {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for Tick")
}
var r0 <-chan time.Time
if rf, ok := ret.Get(0).(func() <-chan time.Time); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan time.Time)
}
}
return r0
}
// NewTicker creates a new instance of Ticker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewTicker(t interface {
mock.TestingT
Cleanup(func())
}) *Ticker {
mock := &Ticker{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
+4 -3
View File
@@ -32,9 +32,10 @@ func Migration() *migrate.MemoryMigrationSource {
status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0),
logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0),
logic_value BYTEA,
recurring_time TIMESTAMP[],
recurring_type SMALLINT,
recurring_period SMALLINT
time TIMESTAMP,
recurring SMALLINT,
recurring_period SMALLINT,
start_datetime TIMESTAMP
)`,
},
Down: []string{
+8 -8
View File
@@ -18,16 +18,16 @@ import (
const (
addRuleQuery = `
INSERT INTO rules (id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status)
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status)
VALUES (:id, :name, :domain_id, :metadata, :input_channel, :input_topic, :logic_type, :logic_value,
:output_channel, :output_topic, :recurring_time, :recurring_type, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status)
:output_channel, :output_topic, :start_datetime, :time, :recurring, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status)
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status;
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
viewRuleQuery = `
SELECT id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value, output_channel,
output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules
WHERE id = $1;
`
@@ -36,11 +36,11 @@ const (
UPDATE rules
SET name = :name, metadata = :metadata, input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type,
logic_value = :logic_value, output_channel = :output_channel, output_topic = :output_topic,
recurring_time = :recurring_time, recurring_type = :recurring_type,
start_datetime = :start_datetime, time = :time, recurring = :recurring,
recurring_period = :recurring_period, updated_at = :updated_at, updated_by = :updated_by, status = :status
WHERE id = :id
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status;
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
removeRuleQuery = `
@@ -53,12 +53,12 @@ const (
SET status = $2
WHERE id = $1
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status;
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
listRulesQuery = `
SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
output_topic, recurring_time, recurring_type, recurring_period, created_at, created_by, updated_at, updated_by, status
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules r %s %s;
`
+25 -49
View File
@@ -10,29 +10,29 @@ import (
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/errors"
"github.com/jackc/pgx/v5/pgtype"
)
// dbRule represents the database structure for a Rule.
type dbRule struct {
ID string `db:"id"`
Name string `db:"name"`
DomainID string `db:"domain_id"`
Metadata []byte `db:"metadata,omitempty"`
InputChannel string `db:"input_channel"`
InputTopic sql.NullString `db:"input_topic"`
LogicType re.ScriptType `db:"logic_type"`
LogicValue string `db:"logic_value"`
OutputChannel sql.NullString `db:"output_channel"`
OutputTopic sql.NullString `db:"output_topic"`
RecurringTime *pgtype.Array[string] `db:"recurring_time"`
RecurringType re.ReccuringType `db:"recurring_type"`
RecurringPeriod uint `db:"recurring_period"`
Status re.Status `db:"status"`
CreatedAt time.Time `db:"created_at"`
CreatedBy string `db:"created_by"`
UpdatedAt time.Time `db:"updated_at"`
UpdatedBy string `db:"updated_by"`
ID string `db:"id"`
Name string `db:"name"`
DomainID string `db:"domain_id"`
Metadata []byte `db:"metadata,omitempty"`
InputChannel string `db:"input_channel"`
InputTopic sql.NullString `db:"input_topic"`
LogicType re.ScriptType `db:"logic_type"`
LogicValue string `db:"logic_value"`
OutputChannel sql.NullString `db:"output_channel"`
OutputTopic sql.NullString `db:"output_topic"`
StartDateTime time.Time `db:"start_datetime"`
Time time.Time `db:"time"`
Recurring re.Recurring `db:"recurring"`
RecurringPeriod uint `db:"recurring_period"`
Status re.Status `db:"status"`
CreatedAt time.Time `db:"created_at"`
CreatedBy string `db:"created_by"`
UpdatedAt time.Time `db:"updated_at"`
UpdatedBy string `db:"updated_by"`
}
func ruleToDb(r re.Rule) (dbRule, error) {
@@ -55,8 +55,9 @@ func ruleToDb(r re.Rule) (dbRule, error) {
LogicValue: r.Logic.Value,
OutputChannel: toNullString(r.OutputChannel),
OutputTopic: toNullString(r.OutputTopic),
RecurringTime: toStringArray(r.Schedule.Time),
RecurringType: r.Schedule.RecurringType,
StartDateTime: r.Schedule.StartDateTime,
Time: r.Schedule.Time,
Recurring: r.Schedule.Recurring,
RecurringPeriod: r.Schedule.RecurringPeriod,
Status: r.Status,
CreatedAt: r.CreatedAt,
@@ -87,8 +88,9 @@ func dbToRule(dto dbRule) (re.Rule, error) {
OutputChannel: fromNullString(dto.OutputChannel),
OutputTopic: fromNullString(dto.OutputTopic),
Schedule: re.Schedule{
Time: toTimeSlice(dto.RecurringTime),
RecurringType: dto.RecurringType,
StartDateTime: dto.StartDateTime,
Time: dto.Time,
Recurring: dto.Recurring,
RecurringPeriod: dto.RecurringPeriod,
},
Status: re.Status(dto.Status),
@@ -112,29 +114,3 @@ func fromNullString(nullString sql.NullString) string {
}
return nullString.String
}
func toStringArray(times []time.Time) *pgtype.Array[string] {
var strArray []string
for _, t := range times {
strArray = append(strArray, t.Format(time.RFC3339))
}
ret := pgtype.Array[string]{
Elements: strArray,
Valid: true,
}
return &ret
}
func toTimeSlice(strArray *pgtype.Array[string]) []time.Time {
if strArray == nil || !strArray.Valid {
return []time.Time{}
}
var times []time.Time
for _, s := range strArray.Elements {
t, err := time.Parse(time.RFC3339, s)
if err == nil {
times = append(times, t)
}
}
return times
}
+109
View File
@@ -0,0 +1,109 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re
import (
"encoding/json"
"time"
)
type Schedule struct {
StartDateTime time.Time `json:"start_datetime"` // When the schedule becomes active
Time time.Time `json:"time"` // Specific time for the rule to run
Recurring Recurring `json:"recurring"` // None, Daily, Weekly, Monthly
RecurringPeriod uint `json:"recurring_period"` // Controls how many intervals to skip between executions: 1 = every interval, 2 = every second interval, etc.
}
func (s Schedule) MarshalJSON() ([]byte, error) {
type Alias Schedule
jTimes := struct {
StartDateTime string `json:"start_datetime"`
Time string `json:"time"`
*Alias
}{
StartDateTime: s.StartDateTime.Format(timeFormat),
Time: s.Time.Format(timeFormat),
Alias: (*Alias)(&s),
}
return json.Marshal(jTimes)
}
func (s *Schedule) UnmarshalJSON(data []byte) error {
type Alias Schedule
aux := struct {
StartDateTime string `json:"start_datetime"`
Time string `json:"time"`
*Alias
}{
Alias: (*Alias)(s),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
if aux.StartDateTime != "" {
startDateTime, err := time.Parse(timeFormat, aux.StartDateTime)
if err != nil {
return err
}
s.StartDateTime = startDateTime
}
if aux.Time != "" {
time, err := time.Parse(timeFormat, aux.Time)
if err != nil {
return err
}
s.Time = time
}
return nil
}
// Type can be daily, weekly or monthly.
type Recurring uint
const (
None Recurring = iota
Daily
Weekly
Monthly
)
func (rt Recurring) String() string {
switch rt {
case Daily:
return "daily"
case Weekly:
return "weekly"
case Monthly:
return "monthly"
default:
return "none"
}
}
func (rt Recurring) MarshalJSON() ([]byte, error) {
return json.Marshal(rt.String())
}
func (rt *Recurring) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
switch s {
case "daily":
*rt = Daily
case "weekly":
*rt = Weekly
case "monthly":
*rt = Monthly
case "none":
*rt = None
default:
return ErrInvalidRecurringType
}
return nil
}
+122 -28
View File
@@ -5,6 +5,7 @@ package re
import (
"context"
"errors"
"time"
"github.com/absmach/supermq"
@@ -15,6 +16,15 @@ import (
lua "github.com/yuin/gopher-lua"
)
const (
timeFormat = "2006-01-02T15:04"
hoursInDay = 24
daysInWeek = 7
monthsInYear = 12
)
var ErrInvalidRecurringType = errors.New("invalid recurring type")
type (
ScriptType uint
Metadata map[string]interface{}
@@ -24,22 +34,6 @@ type (
}
)
// Type can be daily, weekly or monthly.
type ReccuringType uint
const (
None ReccuringType = iota
Daily
Weekly
Monthly
)
type Schedule struct {
Time []time.Time `json:"date,omitempty"`
RecurringType ReccuringType `json:"recurring_type"`
RecurringPeriod uint `json:"recurring_period"` // 1 meaning every Recurring value, 2 meaning every other, and so on.
}
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
@@ -58,6 +52,7 @@ type Rule struct {
UpdatedBy string `json:"updated_by,omitempty"`
}
//go:generate mockery --name Repository --output=./mocks --filename repo.go --quiet --note "Copyright (c) Abstract Machines"
type Repository interface {
AddRule(ctx context.Context, r Rule) (Rule, error)
ViewRule(ctx context.Context, id string) (Rule, error)
@@ -69,15 +64,18 @@ type Repository interface {
// PageMeta contains page metadata that helps navigation.
type PageMeta struct {
Total uint64 `json:"total" db:"total"`
Offset uint64 `json:"offset" db:"offset"`
Limit uint64 `json:"limit" db:"limit"`
Dir string `json:"dir" db:"dir"`
Name string `json:"name" db:"name"`
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
Status Status `json:"status,omitempty" db:"status"`
Domain string `json:"domain_id,omitempty" db:"domain_id"`
Total uint64 `json:"total" db:"total"`
Offset uint64 `json:"offset" db:"offset"`
Limit uint64 `json:"limit" db:"limit"`
Dir string `json:"dir" db:"dir"`
Name string `json:"name" db:"name"`
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
Status Status `json:"status,omitempty" db:"status"`
Domain string `json:"domain_id,omitempty" db:"domain_id"`
ScheduledBefore *time.Time `json:"scheduled_before,omitempty" db:"scheduled_before"` // Filter rules scheduled before this time
ScheduledAfter *time.Time `json:"scheduled_after,omitempty" db:"scheduled_after"` // Filter rules scheduled after this time
Recurring *Recurring `json:"recurring,omitempty" db:"recurring"` // Filter by recurring type
}
type Page struct {
@@ -85,6 +83,7 @@ type Page struct {
Rules []Rule `json:"rules"`
}
//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
type Service interface {
consumers.AsyncConsumer
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
@@ -94,6 +93,7 @@ type Service interface {
RemoveRule(ctx context.Context, session authn.Session, id string) error
EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
StartScheduler(ctx context.Context) error
}
type re struct {
@@ -101,14 +101,16 @@ type re struct {
repo Repository
pubSub messaging.PubSub
errors chan error
ticker Ticker
}
func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub) Service {
func NewService(repo Repository, idp supermq.IDProvider, pubSub messaging.PubSub, tck Ticker) Service {
return &re{
repo: repo,
idp: idp,
pubSub: pubSub,
errors: make(chan error),
ticker: tck,
}
}
@@ -117,12 +119,23 @@ func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (Rule,
if err != nil {
return Rule{}, err
}
r.CreatedAt = time.Now()
now := time.Now()
r.CreatedAt = now
r.ID = id
r.CreatedBy = session.UserID
r.DomainID = session.DomainID
r.Status = EnabledStatus
return re.repo.AddRule(ctx, r)
if r.Schedule.StartDateTime.IsZero() {
r.Schedule.StartDateTime = now
}
rule, err := re.repo.AddRule(ctx, r)
if err != nil {
return Rule{}, err
}
return rule, nil
}
func (re *re) ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error) {
@@ -227,3 +240,84 @@ func (re *re) process(ctx context.Context, r Rule, msg *messaging.Message) error
return re.pubSub.Publish(ctx, m.Channel, m)
}
}
func (re *re) StartScheduler(ctx context.Context) error {
defer re.ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-re.ticker.Tick():
startTime := time.Now()
pm := PageMeta{
Status: EnabledStatus,
ScheduledBefore: &startTime,
}
page, err := re.repo.ListRules(ctx, pm)
if err != nil {
return err
}
for _, rule := range page.Rules {
if rule.shouldRun(startTime) {
go func(r Rule) {
msg := &messaging.Message{
Channel: r.InputChannel,
Created: startTime.Unix(),
}
re.errors <- re.process(ctx, r, msg)
}(rule)
}
}
}
}
}
func (r Rule) shouldRun(startTime time.Time) bool {
// Don't run if the rule's start time is in the future
// This allows scheduling rules to start at a specific future time
if r.Schedule.StartDateTime.After(startTime) {
return false
}
t := r.Schedule.Time.Truncate(time.Minute)
if t.Equal(startTime) {
return true
}
if r.Schedule.RecurringPeriod == 0 {
return false
}
period := int(r.Schedule.RecurringPeriod)
switch r.Schedule.Recurring {
case Daily:
if r.Schedule.RecurringPeriod > 0 {
daysSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / hoursInDay
if int(daysSinceStart)%period == 0 {
return true
}
}
case Weekly:
if r.Schedule.RecurringPeriod > 0 {
weeksSinceStart := startTime.Sub(r.Schedule.StartDateTime).Hours() / (hoursInDay * daysInWeek)
if int(weeksSinceStart)%period == 0 {
return true
}
}
case Monthly:
if r.Schedule.RecurringPeriod > 0 {
monthsSinceStart := (startTime.Year()-r.Schedule.StartDateTime.Year())*monthsInYear +
int(startTime.Month()-r.Schedule.StartDateTime.Month())
if monthsSinceStart%period == 0 {
return true
}
}
}
return false
}
+187
View File
@@ -0,0 +1,187 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/0x6flab/namegenerator"
"github.com/absmach/magistrala/internal/testsutil"
"github.com/absmach/magistrala/pkg/errors"
"github.com/absmach/magistrala/re"
"github.com/absmach/magistrala/re/mocks"
repoerr "github.com/absmach/supermq/pkg/errors/repository"
pubsubmocks "github.com/absmach/supermq/pkg/messaging/mocks"
"github.com/absmach/supermq/pkg/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var (
namegen = namegenerator.NewGenerator()
rule = re.Rule{
ID: testsutil.GenerateUUID(&testing.T{}),
Name: namegen.Generate(),
InputChannel: "test.channel",
Status: re.EnabledStatus,
Schedule: re.Schedule{
StartDateTime: time.Now().Add(-time.Hour), // Started an hour ago
Recurring: re.Daily,
RecurringPeriod: 1,
Time: time.Now().Add(-time.Hour),
},
}
futureRule = re.Rule{
ID: testsutil.GenerateUUID(&testing.T{}),
Name: namegen.Generate(),
InputChannel: "test.channel",
Status: re.EnabledStatus,
Schedule: re.Schedule{
StartDateTime: time.Now().Add(24 * time.Hour),
Recurring: re.None,
},
}
)
func newService(t *testing.T) (re.Service, *mocks.Repository, *mocks.Ticker) {
repo := new(mocks.Repository)
mockTicker := new(mocks.Ticker)
idProvider := uuid.NewMock()
pubsub := pubsubmocks.NewPubSub(t)
return re.NewService(repo, idProvider, pubsub, mockTicker), repo, mockTicker
}
func TestStartScheduler(t *testing.T) {
now := time.Now().Truncate(time.Minute)
svc, repo, ticker := newService(t)
cases := []struct {
desc string
err error
pageMeta re.PageMeta
page re.Page
listErr error
setupCtx func() (context.Context, context.CancelFunc)
}{
{
desc: "start scheduler with canceled context",
err: context.Canceled,
pageMeta: re.PageMeta{
Status: re.EnabledStatus,
ScheduledBefore: &now,
},
setupCtx: func() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
},
},
{
desc: "start scheduler with timeout",
err: context.DeadlineExceeded,
pageMeta: re.PageMeta{
Status: re.EnabledStatus,
ScheduledBefore: &now,
},
setupCtx: func() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), time.Millisecond)
},
},
{
desc: "start scheduler with deadline exceeded",
err: context.DeadlineExceeded,
pageMeta: re.PageMeta{
Status: re.EnabledStatus,
ScheduledBefore: &now,
},
page: re.Page{},
setupCtx: func() (context.Context, context.CancelFunc) {
return context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
},
},
{
desc: "start scheduler successfully processes rules",
err: context.Canceled,
pageMeta: re.PageMeta{
Status: re.EnabledStatus,
ScheduledBefore: &now,
},
page: re.Page{
Rules: []re.Rule{rule},
},
setupCtx: func() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
},
},
{
desc: "start scheduler with list error",
err: repoerr.ErrViewEntity,
pageMeta: re.PageMeta{
Status: re.EnabledStatus,
ScheduledBefore: &now,
},
page: re.Page{},
listErr: repoerr.ErrViewEntity,
setupCtx: func() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
},
},
{
desc: "start scheduler with rule to be run in the future",
err: context.Canceled,
pageMeta: re.PageMeta{
Status: re.EnabledStatus,
ScheduledBefore: &now,
},
page: re.Page{
Rules: []re.Rule{futureRule},
},
setupCtx: func() (context.Context, context.CancelFunc) {
return context.WithCancel(context.Background())
},
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
repoCall := repo.On("ListRules", mock.Anything, mock.Anything).Return(tc.page, tc.listErr)
tickChan := make(chan time.Time)
tickCall := ticker.On("Tick").Return((<-chan time.Time)(tickChan))
tickCall1 := ticker.On("Stop").Return()
ctx, cancel := tc.setupCtx()
defer cancel()
errc := make(chan error)
go func() {
errc <- svc.StartScheduler(ctx)
}()
switch tc.desc {
case "start scheduler with canceled context":
cancel()
case "start scheduler successfully processes rules":
tickChan <- time.Now()
time.Sleep(100 * time.Millisecond)
cancel()
case "start scheduler with rule to be run in the future":
tickChan <- time.Now()
time.Sleep(100 * time.Millisecond)
cancel()
case "start scheduler with list error":
tickChan <- time.Now()
time.Sleep(100 * time.Millisecond)
if err := svc.Errors(); err != nil {
cancel()
}
}
err := <-errc
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("expected error %v but got %v", tc.err, err))
repoCall.Unset()
tickCall.Unset()
tickCall1.Unset()
})
}
}
+24
View File
@@ -0,0 +1,24 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
package re
import "time"
//go:generate mockery --name Ticker --output=./mocks --filename ticker.go --quiet --note "Copyright (c) Abstract Machines"
type Ticker interface {
Tick() <-chan time.Time
Stop()
}
type timeTicker struct {
*time.Ticker
}
func NewTicker(d time.Duration) Ticker {
return &timeTicker{time.NewTicker(d)}
}
func (t *timeTicker) Tick() <-chan time.Time {
return t.C
}