NOISSUE - Update Rules Service (#32)

* update re service

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* add status update for rules

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

* revert docker compose changes

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>

---------

Signed-off-by: ianmuchyri <ianmuchiri8@gmail.com>
This commit is contained in:
Ian Ngethe Muchiri
2025-01-09 11:56:45 +03:00
committed by GitHub
parent 3ee7b13538
commit 3f0bb258c7
10 changed files with 262 additions and 105 deletions
+7 -7
View File
@@ -39,23 +39,23 @@ import (
const (
svcName = "rules_engine"
envPrefixDB = "SMQ_RE_DB_"
envPrefixHTTP = "SMQ_RE_HTTP_"
envPrefixDB = "MG_RE_DB_"
envPrefixHTTP = "MG_RE_HTTP_"
envPrefixAuth = "SMQ_AUTH_GRPC_"
defDB = "r"
defSvcHTTPPort = "9008"
)
type config struct {
LogLevel string `env:"SMQ_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"SMQ_RE_INSTANCE_ID" envDefault:""`
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
CacheURL string `env:"SMQ_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"SMQ_RE_CACHE_KEY_DURATION" envDefault:"10m"`
CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
ConfigPath string `env:"SMQ_RE_CONFIG_PATH" envDefault:"/config.toml"`
ConfigPath string `env:"MG_RE_CONFIG_PATH" envDefault:"/config.toml"`
BrokerURL string `env:"SMQ_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
}
+15 -15
View File
@@ -334,21 +334,21 @@ SMQ_CHANNELS_DB_SSL_ROOT_CERT=
SMQ_CHANNELS_INSTANCE_ID=
### RE
SMQ_RE_LOG_LEVEL=debug
SMQ_RE_HTTP_HOST=re
SMQ_RE_HTTP_PORT=9008
SMQ_RE_HTTP_SERVER_CERT=
SMQ_RE_HTTP_SERVER_KEY=
SMQ_RE_DB_HOST=re-db
SMQ_RE_DB_PORT=5432
SMQ_RE_DB_USER=magistrala
SMQ_RE_DB_PASS=magistrala
SMQ_RE_DB_NAME=rule_engine
SMQ_RE_DB_SSL_MODE=disable
SMQ_RE_DB_SSL_CERT=
SMQ_RE_DB_SSL_KEY=
SMQ_RE_DB_SSL_ROOT_CERT=
SMQ_RE_INSTANCE_ID=
MG_RE_LOG_LEVEL=debug
MG_RE_HTTP_HOST=re
MG_RE_HTTP_PORT=9008
MG_RE_HTTP_SERVER_CERT=
MG_RE_HTTP_SERVER_KEY=
MG_RE_DB_HOST=re-db
MG_RE_DB_PORT=5432
MG_RE_DB_USER=magistrala
MG_RE_DB_PASS=magistrala
MG_RE_DB_NAME=rule_engine
MG_RE_DB_SSL_MODE=disable
MG_RE_DB_SSL_CERT=
MG_RE_DB_SSL_KEY=
MG_RE_DB_SSL_ROOT_CERT=
MG_RE_INSTANCE_ID=
#### Channels Client Config
SMQ_CHANNELS_URL=http://channels:9005
+20 -20
View File
@@ -22,11 +22,11 @@ services:
restart: on-failure
command: postgres -c "max_connections=${SMQ_POSTGRES_MAX_CONNECTIONS}"
environment:
POSTGRES_USER: ${SMQ_RE_DB_USER}
POSTGRES_PASSWORD: ${SMQ_RE_DB_PASS}
POSTGRES_DB: ${SMQ_RE_DB_NAME}
POSTGRES_USER: ${MG_RE_DB_USER}
POSTGRES_PASSWORD: ${MG_RE_DB_PASS}
POSTGRES_DB: ${MG_RE_DB_NAME}
ports:
- 6008:5432
- 6009:5432
networks:
- magistrala-base-net
volumes:
@@ -39,20 +39,20 @@ services:
- re-db
restart: on-failure
environment:
SMQ_RE_LOG_LEVEL: ${SMQ_RE_LOG_LEVEL}
SMQ_RE_HTTP_PORT: ${SMQ_RE_HTTP_PORT}
SMQ_RE_HTTP_HOST: ${SMQ_RE_HTTP_HOST}
SMQ_RE_HTTP_SERVER_CERT: ${SMQ_RE_HTTP_SERVER_CERT}
SMQ_RE_HTTP_SERVER_KEY: ${SMQ_RE_HTTP_SERVER_KEY}
SMQ_RE_DB_HOST: ${SMQ_RE_DB_HOST}
SMQ_RE_DB_PORT: ${SMQ_RE_DB_PORT}
SMQ_RE_DB_USER: ${SMQ_RE_DB_USER}
SMQ_RE_DB_PASS: ${SMQ_RE_DB_PASS}
SMQ_RE_DB_NAME: ${SMQ_RE_DB_NAME}
SMQ_RE_DB_SSL_MODE: ${SMQ_RE_DB_SSL_MODE}
SMQ_RE_DB_SSL_CERT: ${SMQ_RE_DB_SSL_CERT}
SMQ_RE_DB_SSL_KEY: ${SMQ_RE_DB_SSL_KEY}
SMQ_RE_DB_SSL_ROOT_CERT: ${SMQ_RE_DB_SSL_ROOT_CERT}
MG_RE_LOG_LEVEL: ${MG_RE_LOG_LEVEL}
MG_RE_HTTP_PORT: ${MG_RE_HTTP_PORT}
MG_RE_HTTP_HOST: ${MG_RE_HTTP_HOST}
MG_RE_HTTP_SERVER_CERT: ${MG_RE_HTTP_SERVER_CERT}
MG_RE_HTTP_SERVER_KEY: ${MG_RE_HTTP_SERVER_KEY}
MG_RE_DB_HOST: ${MG_RE_DB_HOST}
MG_RE_DB_PORT: ${MG_RE_DB_PORT}
MG_RE_DB_USER: ${MG_RE_DB_USER}
MG_RE_DB_PASS: ${MG_RE_DB_PASS}
MG_RE_DB_NAME: ${MG_RE_DB_NAME}
MG_RE_DB_SSL_MODE: ${MG_RE_DB_SSL_MODE}
MG_RE_DB_SSL_CERT: ${MG_RE_DB_SSL_CERT}
MG_RE_DB_SSL_KEY: ${MG_RE_DB_SSL_KEY}
MG_RE_DB_SSL_ROOT_CERT: ${MG_RE_DB_SSL_ROOT_CERT}
SMQ_MESSAGE_BROKER_URL: ${SMQ_MESSAGE_BROKER_URL}
SMQ_JAEGER_URL: ${SMQ_JAEGER_URL}
SMQ_JAEGER_TRACE_RATIO: ${SMQ_JAEGER_TRACE_RATIO}
@@ -65,9 +65,9 @@ services:
SMQ_SPICEDB_PRE_SHARED_KEY: ${SMQ_SPICEDB_PRE_SHARED_KEY}
SMQ_SPICEDB_HOST: ${SMQ_SPICEDB_HOST}
SMQ_SPICEDB_PORT: ${SMQ_SPICEDB_PORT}
SMQ_RE_INSTANCE_ID: ${SMQ_RE_INSTANCE_ID}
MG_RE_INSTANCE_ID: ${MG_RE_INSTANCE_ID}
ports:
- ${SMQ_RE_HTTP_PORT}:${SMQ_RE_HTTP_PORT}
- ${MG_RE_HTTP_PORT}:${MG_RE_HTTP_PORT}
networks:
- magistrala-base-net
volumes:
+52 -5
View File
@@ -86,27 +86,74 @@ func listRulesEndpoint(s re.Service) endpoint.Endpoint {
return rulesPageRes{}, nil
}
ret := rulesPageRes{
pageRes: pageRes{
Limit: page.Limit,
Offset: page.Offset,
Total: page.Total,
},
Rules: page.Rules,
}
return ret, nil
}
}
func upadateRuleStatusEndpoint(s re.Service) endpoint.Endpoint {
func deleteRuleEndpoint(s re.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
session, ok := ctx.Value(api.SessionKey).(authn.Session)
if !ok {
return nil, svcerr.ErrAuthorization
}
req := request.(changeRuleStatusReq)
req := request.(deleteRuleReq)
if err := req.validate(); err != nil {
return updateRoleStatusRes{}, err
return deleteRuleRes{}, err
}
err := s.RemoveRule(ctx, session, req.id)
if err != nil {
return updateRoleStatusRes{false}, err
return deleteRuleRes{false}, err
}
return updateRoleStatusRes{true}, nil
return deleteRuleRes{true}, nil
}
}
func enableRuleEndpoint(s re.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
session, ok := ctx.Value(api.SessionKey).(authn.Session)
if !ok {
return nil, svcerr.ErrAuthorization
}
req := request.(updateRuleStatusReq)
if err := req.validate(); err != nil {
return updateRuleStatusRes{}, err
}
rule, err := s.EnableRule(ctx, session, req.id)
if err != nil {
return updateRuleStatusRes{}, err
}
return updateRuleStatusRes{Rule: rule}, err
}
}
func disableRuleEndpoint(s re.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
session, ok := ctx.Value(api.SessionKey).(authn.Session)
if !ok {
return nil, svcerr.ErrAuthorization
}
req := request.(updateRuleStatusReq)
if err := req.validate(); err != nil {
return updateRuleStatusRes{}, err
}
rule, err := s.DisableRule(ctx, session, req.id)
if err != nil {
return updateRuleStatusRes{}, err
}
return updateRuleStatusRes{Rule: rule}, err
}
}
+25 -5
View File
@@ -9,13 +9,19 @@ import (
apiutil "github.com/absmach/supermq/api/http/util"
)
const maxLimitSize = 1000
const (
maxLimitSize = 1000
MaxNameSize = 1024
)
type addRuleReq struct {
re.Rule
}
func (req addRuleReq) validate() error {
if len(req.Name) > api.MaxNameSize || req.Name == "" {
return apiutil.ErrNameSize
}
return nil
}
@@ -57,16 +63,30 @@ func (req updateRuleReq) validate() error {
if len(req.Rule.Logic.Value) == 0 {
return apiutil.ErrEmptyList
}
if len(req.Rule.Name) > api.MaxNameSize || req.Rule.Name == "" {
return apiutil.ErrNameSize
}
return nil
}
type changeRuleStatusReq struct {
id string
status re.Status
type updateRuleStatusReq struct {
id string
}
func (req changeRuleStatusReq) validate() error {
func (req updateRuleStatusReq) validate() error {
if req.id == "" {
return apiutil.ErrMissingID
}
return nil
}
type deleteRuleReq struct {
id string
}
func (req deleteRuleReq) validate() error {
if req.id == "" {
return apiutil.ErrMissingID
}
+10 -10
View File
@@ -14,10 +14,10 @@ import (
var (
_ supermq.Response = (*viewRuleRes)(nil)
_ supermq.Response = (*addRuleRes)(nil)
_ supermq.Response = (*changeRuleStatusRes)(nil)
_ supermq.Response = (*updateRuleStatusRes)(nil)
_ supermq.Response = (*rulesPageRes)(nil)
_ supermq.Response = (*updateRuleRes)(nil)
_ supermq.Response = (*updateRoleStatusRes)(nil)
_ supermq.Response = (*deleteRuleRes)(nil)
)
type pageRes struct {
@@ -102,27 +102,27 @@ func (res rulesPageRes) Empty() bool {
return false
}
type changeRuleStatusRes struct {
type updateRuleStatusRes struct {
re.Rule `json:",inline"`
}
func (res changeRuleStatusRes) Code() int {
func (res updateRuleStatusRes) Code() int {
return http.StatusOK
}
func (res changeRuleStatusRes) Headers() map[string]string {
func (res updateRuleStatusRes) Headers() map[string]string {
return map[string]string{}
}
func (res changeRuleStatusRes) Empty() bool {
func (res updateRuleStatusRes) Empty() bool {
return false
}
type updateRoleStatusRes struct {
type deleteRuleRes struct {
deleted bool
}
func (res updateRoleStatusRes) Code() int {
func (res deleteRuleRes) Code() int {
if res.deleted {
return http.StatusNoContent
}
@@ -130,10 +130,10 @@ func (res updateRoleStatusRes) Code() int {
return http.StatusOK
}
func (res updateRoleStatusRes) Headers() map[string]string {
func (res deleteRuleRes) Headers() map[string]string {
return map[string]string{}
}
func (res updateRoleStatusRes) Empty() bool {
func (res deleteRuleRes) Empty() bool {
return true
}
+38 -15
View File
@@ -16,7 +16,7 @@ import (
apiutil "github.com/absmach/supermq/api/http/util"
mgauthn "github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-chi/chi"
"github.com/go-chi/chi/v5"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
@@ -66,12 +66,26 @@ func MakeHandler(svc re.Service, authn mgauthn.Authentication, logger *slog.Logg
opts...,
), "update_rule").ServeHTTP)
r.Put("/{ruleID}/status", otelhttp.NewHandler(kithttp.NewServer(
upadateRuleStatusEndpoint(svc),
r.Delete("/{ruleID}", otelhttp.NewHandler(kithttp.NewServer(
deleteRuleEndpoint(svc),
decodeDeleteRuleRequest,
api.EncodeResponse,
opts...,
), "delete_rule").ServeHTTP)
r.Put("/{ruleID}/enable", otelhttp.NewHandler(kithttp.NewServer(
enableRuleEndpoint(svc),
decodeUpdateRuleStatusRequest,
api.EncodeResponse,
opts...,
), "update_rule_status").ServeHTTP)
), "enable_rule").ServeHTTP)
r.Put("/{ruleID}/disable", otelhttp.NewHandler(kithttp.NewServer(
disableRuleEndpoint(svc),
decodeUpdateRuleStatusRequest,
api.EncodeResponse,
opts...,
), "disable_rule").ServeHTTP)
})
})
@@ -105,6 +119,13 @@ func decodeUpdateRuleRequest(_ context.Context, r *http.Request) (interface{}, e
return updateRuleReq{Rule: rule}, nil
}
func decodeUpdateRuleStatusRequest(_ context.Context, r *http.Request) (interface{}, error) {
req := updateRuleStatusReq{
id: chi.URLParam(r, idKey),
}
return req, nil
}
func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, error) {
offset, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset)
if err != nil {
@@ -122,25 +143,27 @@ func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, er
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
s, err := apiutil.ReadStringQuery(r, api.StatusKey, api.DefStatus)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
st, err := re.ToStatus(s)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
return listRulesReq{
PageMeta: re.PageMeta{
Offset: offset,
Limit: limit,
InputChannel: ic,
OutputChannel: oc,
Status: st,
},
}, nil
}
func decodeUpdateRuleStatusRequest(_ context.Context, r *http.Request) (interface{}, error) {
id := r.URL.Query().Get(idKey)
status, err := apiutil.ReadStringQuery(r, statusKey, re.AllStatus.String())
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
s, err := re.ToStatus(status)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
return changeRuleStatusReq{id: id, status: s}, nil
func decodeDeleteRuleRequest(_ context.Context, r *http.Request) (interface{}, error) {
id := chi.URLParam(r, idKey)
return deleteRuleReq{id: id}, nil
}
+67 -25
View File
@@ -17,27 +17,30 @@ import (
// SQL Queries as Strings.
const (
addRuleQuery = `
INSERT INTO rules (id, domain_id, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, status)
VALUES (:id, :domain_id, :input_channel, :input_topic, :logic_type, :logic_value,
:output_channel, :output_topic, :recurring_time, :recurring_type, :recurring_period, :status)
RETURNING id;
INSERT INTO rules (id, name, domain_id, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status)
VALUES (:id, :name, :domain_id, :input_channel, :input_topic, :logic_type, :logic_value,
:output_channel, :output_topic, :recurring_time, :recurring_type, :recurring_period, :created_at, :updated_at, :updated_by, :status)
RETURNING id, name, domain_id, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status;
`
viewRuleQuery = `
SELECT id, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
output_topic, recurring_time, recurring_type, recurring_period, status
SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status
FROM rules
WHERE id = $1;
`
updateRuleQuery = `
UPDATE rules
SET input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type,
SET name = :name, input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type,
logic_value = :logic_value, output_channel = :output_channel, output_topic = :output_topic,
recurring_time = :recurring_time, recurring_type = :recurring_type,
recurring_period = :recurring_period, status = :status
WHERE id = :id;
recurring_period = :recurring_period, updated_at = :updated_at, updated_by = :updated_by, status = :status
WHERE id = :id
RETURNING id, name, domain_id, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status;
`
removeRuleQuery = `
@@ -45,10 +48,18 @@ const (
WHERE id = $1;
`
updateRuleStatusQuery = `
UPDATE rules
SET status = $2
WHERE id = $1
RETURNING id, name, domain_id, input_channel, input_topic, logic_type, logic_value,
output_channel, output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status;
`
listRulesQuery = `
SELECT id, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
output_topic, recurring_time, recurring_type, recurring_period, status
FROM rules r %s %s;
SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
output_topic, recurring_time, recurring_type, recurring_period, created_at, updated_at, updated_by, status
FROM rules r %s %s;
`
totalQuery = `SELECT COUNT(*) FROM rules r %s;`
@@ -64,11 +75,22 @@ func NewRepository(db postgres.Database) re.Repository {
func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) {
dbr := ruleToDb(r)
_, err := repo.DB.NamedExecContext(ctx, addRuleQuery, dbr)
row, err := repo.DB.NamedQueryContext(ctx, addRuleQuery, dbr)
if err != nil {
return re.Rule{}, err
}
return r, nil
defer row.Close()
var dbRule dbRule
if row.Next() {
if err := row.StructScan(&dbRule); err != nil {
return re.Rule{}, err
}
}
rule := dbToRule(dbRule)
return rule, nil
}
func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rule, error) {
@@ -77,8 +99,7 @@ func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rul
return re.Rule{}, err
}
var dbr dbRule
err := row.StructScan(&dbr)
if err != nil {
if err := row.StructScan(&dbr); err != nil {
return re.Rule{}, err
}
ret := dbToRule(dbr)
@@ -86,18 +107,39 @@ func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rul
return ret, nil
}
func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) {
dbr := ruleToDb(r)
result, err := repo.DB.NamedExecContext(ctx, updateRuleQuery, dbr)
if err != nil {
func (repo *PostgresRepository) UpdateRuleStatus(ctx context.Context, id string, status re.Status) (re.Rule, error) {
row := repo.DB.QueryRowxContext(ctx, updateRuleStatusQuery, id, status)
if err := row.Err(); err != nil {
return re.Rule{}, err
}
if _, err := result.RowsAffected(); err != nil {
return re.Rule{}, repoerr.ErrNotFound
var dbr dbRule
if err := row.StructScan(&dbr); err != nil {
return re.Rule{}, err
}
return r, nil
rule := dbToRule(dbr)
return rule, nil
}
func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) {
dbr := ruleToDb(r)
row, err := repo.DB.NamedQueryContext(ctx, updateRuleQuery, dbr)
if err != nil {
return re.Rule{}, err
}
defer row.Close()
var dbRule dbRule
if row.Next() {
if err := row.StructScan(&dbRule); err != nil {
return re.Rule{}, err
}
}
rule := dbToRule(dbRule)
return rule, nil
}
func (repo *PostgresRepository) RemoveRule(ctx context.Context, id string) error {
@@ -119,7 +161,7 @@ func (repo *PostgresRepository) ListRules(ctx context.Context, pm re.PageMeta) (
pgData = "LIMIT :limit"
}
if pm.Offset != 0 {
pgData += " OFFEST :offset"
pgData += " OFFSET :offset"
}
pq := pageQuery(pm)
q := fmt.Sprintf(listRulesQuery, pq, pgData)
+3
View File
@@ -14,6 +14,7 @@ import (
// dbRule represents the database structure for a Rule.
type dbRule struct {
ID string `db:"id"`
Name string `db:"name"`
DomainID string `db:"domain_id"`
InputChannel string `db:"input_channel"`
InputTopic sql.NullString `db:"input_topic"`
@@ -34,6 +35,7 @@ type dbRule struct {
func ruleToDb(r re.Rule) dbRule {
return dbRule{
ID: r.ID,
Name: r.Name,
DomainID: r.DomainID,
InputChannel: r.InputChannel,
InputTopic: toNullString(r.InputTopic),
@@ -55,6 +57,7 @@ func ruleToDb(r re.Rule) dbRule {
func dbToRule(dto dbRule) re.Rule {
return re.Rule{
ID: dto.ID,
Name: dto.Name,
DomainID: dto.DomainID,
InputChannel: dto.InputChannel,
InputTopic: fromNullString(dto.InputTopic),
+25 -3
View File
@@ -33,13 +33,14 @@ const (
)
type Schedule struct {
Time []time.Time `json:"date,omitempty"`
RecurringType ReccuringType
RecurringPeriod uint // 1 meaning every Recurring value, 2 meaning every other, and so on.
Time []time.Time `json:"date,omitempty"`
RecurringType ReccuringType `json:"recurring_type"`
RecurringPeriod uint `json:"recurring_period"` // 1 meaning every Recurring value, 2 meaning every other, and so on.
}
type Rule struct {
ID string `json:"id"`
Name string `json:"name"`
DomainID string `json:"domain"`
InputChannel string `json:"input_channel"`
InputTopic string `json:"input_topic"`
@@ -59,6 +60,7 @@ type Repository interface {
ViewRule(ctx context.Context, id string) (Rule, error)
UpdateRule(ctx context.Context, r Rule) (Rule, error)
RemoveRule(ctx context.Context, id string) error
UpdateRuleStatus(ctx context.Context, id string, status Status) (Rule, error)
ListRules(ctx context.Context, pm PageMeta) (Page, error)
}
@@ -86,6 +88,8 @@ type Service interface {
UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error)
RemoveRule(ctx context.Context, session authn.Session, id string) error
EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
}
type re struct {
@@ -122,6 +126,8 @@ func (re *re) ViewRule(ctx context.Context, session authn.Session, id string) (R
}
func (re *re) UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
r.UpdatedAt = time.Now()
r.UpdatedBy = session.UserID
return re.repo.UpdateRule(ctx, r)
}
@@ -133,6 +139,22 @@ func (re *re) RemoveRule(ctx context.Context, session authn.Session, id string)
return re.repo.RemoveRule(ctx, id)
}
func (re *re) EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error) {
status, err := ToStatus(Enabled)
if err != nil {
return Rule{}, err
}
return re.repo.UpdateRuleStatus(ctx, id, status)
}
func (re *re) DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error) {
status, err := ToStatus(Disabled)
if err != nil {
return Rule{}, err
}
return re.repo.UpdateRuleStatus(ctx, id, status)
}
func (re *re) ConsumeAsync(ctx context.Context, msgs interface{}) {
switch m := msgs.(type) {
case *messaging.Message: