MG-154 - Add Tags in Rule Engine listing and support filter by tags (#212)

* add tags to repo layer

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update service layer

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update api layer

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* update tests

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* address comments

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix postgres methods

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

* fix logic

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>

---------

Signed-off-by: nyagamunene <stevenyaga2014@gmail.com>
This commit is contained in:
Steve Munene
2025-06-20 17:28:24 +03:00
committed by GitHub
parent d66e5ce329
commit 184a9161d6
14 changed files with 542 additions and 112 deletions
+27
View File
@@ -8,7 +8,9 @@ import (
"github.com/absmach/magistrala/re"
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/go-kit/kit/endpoint"
)
@@ -70,6 +72,31 @@ func updateRuleEndpoint(s re.Service) endpoint.Endpoint {
}
}
func updateRuleTagsEndpoint(svc re.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(updateRuleTagsReq)
if err := req.validate(); err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
session, ok := ctx.Value(api.SessionKey).(authn.Session)
if !ok {
return nil, svcerr.ErrAuthentication
}
r := re.Rule{
ID: req.id,
Tags: req.Tags,
}
res, err := svc.UpdateRuleTags(ctx, session, r)
if err != nil {
return nil, err
}
return updateRuleRes{Rule: res}, nil
}
}
func updateRuleScheduleEndpoint(s re.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
session, ok := ctx.Value(api.SessionKey).(authn.Session)
+148
View File
@@ -496,6 +496,14 @@ func TestListRulesEndpoint(t *testing.T) {
status: http.StatusBadRequest,
err: apiutil.ErrInvalidQueryParams,
},
{
desc: "list rules with duplicate tags",
domainID: domainID,
token: validToken,
query: "tag=tag1&tag=tag2",
status: http.StatusBadRequest,
err: apiutil.ErrInvalidQueryParams,
},
}
for _, tc := range cases {
@@ -675,6 +683,146 @@ func TestUpdateRulesEndpoint(t *testing.T) {
}
}
func TestUpdateRuleTagsEndpoint(t *testing.T) {
ts, svc, authn := newRuleEngineServer()
defer ts.Close()
newTag := "newtag"
cases := []struct {
desc string
token string
id string
domainID string
data string
contentType string
session smqauthn.Session
svcResp re.Rule
svcErr error
resp re.Rule
status int
authnErr error
err error
}{
{
desc: "update rule tags successfully",
token: validToken,
domainID: domainID,
id: validID,
data: fmt.Sprintf(`{"tags":["%s"]}`, newTag),
contentType: contentType,
svcResp: rule,
status: http.StatusOK,
err: nil,
},
{
desc: "update rule tags with invalid token",
token: invalidToken,
session: smqauthn.Session{},
domainID: domainID,
id: validID,
data: fmt.Sprintf(`{"tags":["%s"]}`, newTag),
contentType: contentType,
authnErr: svcerr.ErrAuthentication,
status: http.StatusUnauthorized,
err: svcerr.ErrAuthentication,
},
{
desc: "update rule tags with empty token",
token: "",
session: smqauthn.Session{},
domainID: domainID,
id: validID,
data: fmt.Sprintf(`{"tags":["%s"]}`, newTag),
contentType: contentType,
status: http.StatusUnauthorized,
err: apiutil.ErrBearerToken,
},
{
desc: "update rule tags with empty domainID",
token: validToken,
id: validID,
data: fmt.Sprintf(`{"tags":["%s"]}`, newTag),
contentType: contentType,
status: http.StatusBadRequest,
err: apiutil.ErrMissingDomainID,
},
{
desc: "update rule tags with invalid content type",
token: validToken,
id: validID,
domainID: domainID,
data: fmt.Sprintf(`{"tags":["%s"]}`, newTag),
contentType: "application/xml",
svcResp: rule,
status: http.StatusUnsupportedMediaType,
err: apiutil.ErrUnsupportedContentType,
},
{
desc: "update rule tags with service error",
token: validToken,
id: validID,
domainID: domainID,
data: fmt.Sprintf(`{"tags":["%s"]}`, newTag),
contentType: contentType,
svcResp: re.Rule{},
svcErr: svcerr.ErrAuthorization,
status: http.StatusForbidden,
err: svcerr.ErrAuthorization,
},
{
desc: "update rule with malformed request",
token: validToken,
id: validID,
domainID: domainID,
contentType: contentType,
data: fmt.Sprintf(`{"tags":["%s"}`, newTag),
status: http.StatusBadRequest,
err: errors.ErrMalformedEntity,
},
{
desc: "update rule with empty id",
token: validToken,
id: "",
domainID: domainID,
contentType: contentType,
data: fmt.Sprintf(`{"tags":["%s"]}`, newTag),
status: http.StatusBadRequest,
err: apiutil.ErrMissingID,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
req := testRequest{
client: ts.Client(),
method: http.MethodPatch,
url: fmt.Sprintf("%s/%s/rules/%s/tags", ts.URL, tc.domainID, tc.id),
contentType: tc.contentType,
token: tc.token,
body: strings.NewReader(tc.data),
}
if tc.token == validToken {
tc.session = smqauthn.Session{DomainUserID: auth.EncodeDomainUserID(domainID, userID), UserID: userID, DomainID: domainID}
}
authCall := authn.On("Authenticate", mock.Anything, tc.token).Return(tc.session, tc.authnErr)
svcCall := svc.On("UpdateRuleTags", mock.Anything, tc.session, re.Rule{ID: tc.id, Tags: []string{newTag}}).Return(tc.svcResp, tc.svcErr)
res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", tc.desc, err))
var errRes respBody
err = json.NewDecoder(res.Body).Decode(&errRes)
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error while decoding response body: %s", tc.desc, err))
if errRes.Err != "" || errRes.Message != "" {
err = errors.Wrap(errors.New(errRes.Err), errors.New(errRes.Message))
}
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("%s: expected %s got %s\n", tc.desc, tc.err, err))
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected status code %d got %d", tc.desc, tc.status, res.StatusCode))
svcCall.Unset()
authCall.Unset()
})
}
}
func TestEnableRuleEndpoint(t *testing.T) {
ts, svc, authn := newRuleEngineServer()
defer ts.Close()
+13
View File
@@ -69,6 +69,19 @@ func (req updateRuleReq) validate() error {
return nil
}
type updateRuleTagsReq struct {
id string
Tags []string `json:"tags,omitempty"`
}
func (req updateRuleTagsReq) validate() error {
if req.id == "" {
return apiutil.ErrMissingID
}
return nil
}
type updateRuleScheduleReq struct {
id string
Schedule schedule.Schedule `json:"schedule,omitempty"`
+27
View File
@@ -70,6 +70,13 @@ func MakeHandler(svc re.Service, authn mgauthn.Authentication, mux *chi.Mux, log
opts...,
), "update_rule").ServeHTTP)
r.Patch("/tags", otelhttp.NewHandler(kithttp.NewServer(
updateRuleTagsEndpoint(svc),
decodeUpdateRuleTags,
api.EncodeResponse,
opts...,
), "update_rule_tags").ServeHTTP)
r.Patch("/schedule", otelhttp.NewHandler(kithttp.NewServer(
updateRuleScheduleEndpoint(svc),
decodeUpdateRuleScheduleRequest,
@@ -136,6 +143,21 @@ func decodeUpdateRuleRequest(_ context.Context, r *http.Request) (interface{}, e
return updateRuleReq{Rule: rule}, nil
}
func decodeUpdateRuleTags(_ context.Context, r *http.Request) (interface{}, error) {
if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) {
return nil, errors.Wrap(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType)
}
req := updateRuleTagsReq{
id: chi.URLParam(r, ruleIdKey),
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, errors.Wrap(errors.ErrMalformedEntity, err))
}
return req, nil
}
func decodeUpdateRuleScheduleRequest(_ context.Context, r *http.Request) (interface{}, error) {
if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) {
return nil, errors.Wrap(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType)
@@ -191,6 +213,10 @@ func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, er
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
tag, err := apiutil.ReadStringQuery(r, api.TagKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
return listRulesReq{
PageMeta: re.PageMeta{
Offset: offset,
@@ -200,6 +226,7 @@ func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, er
OutputChannel: oc,
Status: st,
Dir: dir,
Tag: tag,
},
}, nil
}
+16
View File
@@ -82,6 +82,22 @@ func (am *authorizationMiddleware) UpdateRule(ctx context.Context, session authn
return am.svc.UpdateRule(ctx, session, r)
}
func (am *authorizationMiddleware) UpdateRuleTags(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
if err := am.authorize(ctx, smqauthz.PolicyReq{
Domain: session.DomainID,
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: session.DomainUserID,
Object: session.DomainID,
ObjectType: policies.DomainType,
Permission: policies.MembershipPermission,
}); err != nil {
return re.Rule{}, errors.Wrap(errDomainUpdateRules, err)
}
return am.svc.UpdateRuleTags(ctx, session, r)
}
func (am *authorizationMiddleware) UpdateRuleSchedule(ctx context.Context, session authn.Session, r re.Rule) (re.Rule, error) {
if err := am.authorize(ctx, smqauthz.PolicyReq{
Domain: session.DomainID,
+20
View File
@@ -82,6 +82,26 @@ func (lm *loggingMiddleware) UpdateRule(ctx context.Context, session authn.Sessi
return lm.svc.UpdateRule(ctx, session, r)
}
func (lm *loggingMiddleware) UpdateRuleTags(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("domain_id", session.DomainID),
slog.Group("rule",
slog.String("id", r.ID),
slog.String("name", r.Name),
),
}
if err != nil {
args = append(args, slog.String("error", err.Error()))
lm.logger.Warn("Update rule failed", args...)
return
}
lm.logger.Info("Update rule tags completed successfully", args...)
}(time.Now())
return lm.svc.UpdateRuleTags(ctx, session, r)
}
func (lm *loggingMiddleware) UpdateRuleSchedule(ctx context.Context, session authn.Session, r re.Rule) (res re.Rule, err error) {
defer func(begin time.Time) {
args := []any{
+55
View File
@@ -419,6 +419,61 @@ func (_c *Repository_UpdateRuleStatus_Call) RunAndReturn(run func(ctx context.Co
return _c
}
// UpdateRuleTags provides a mock function for the type Repository
func (_mock *Repository) UpdateRuleTags(ctx context.Context, r re.Rule) (re.Rule, error) {
ret := _mock.Called(ctx, r)
if len(ret) == 0 {
panic("no return value specified for UpdateRuleTags")
}
var r0 re.Rule
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, re.Rule) (re.Rule, error)); ok {
return returnFunc(ctx, r)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, re.Rule) re.Rule); ok {
r0 = returnFunc(ctx, r)
} else {
r0 = ret.Get(0).(re.Rule)
}
if returnFunc, ok := ret.Get(1).(func(context.Context, re.Rule) error); ok {
r1 = returnFunc(ctx, r)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Repository_UpdateRuleTags_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateRuleTags'
type Repository_UpdateRuleTags_Call struct {
*mock.Call
}
// UpdateRuleTags is a helper method to define mock.On call
// - ctx
// - r
func (_e *Repository_Expecter) UpdateRuleTags(ctx interface{}, r interface{}) *Repository_UpdateRuleTags_Call {
return &Repository_UpdateRuleTags_Call{Call: _e.mock.On("UpdateRuleTags", ctx, r)}
}
func (_c *Repository_UpdateRuleTags_Call) Run(run func(ctx context.Context, r re.Rule)) *Repository_UpdateRuleTags_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(re.Rule))
})
return _c
}
func (_c *Repository_UpdateRuleTags_Call) Return(rule re.Rule, err error) *Repository_UpdateRuleTags_Call {
_c.Call.Return(rule, err)
return _c
}
func (_c *Repository_UpdateRuleTags_Call) RunAndReturn(run func(ctx context.Context, r re.Rule) (re.Rule, error)) *Repository_UpdateRuleTags_Call {
_c.Call.Return(run)
return _c
}
// ViewRule provides a mock function for the type Repository
func (_mock *Repository) ViewRule(ctx context.Context, id string) (re.Rule, error) {
ret := _mock.Called(ctx, id)
+56
View File
@@ -560,6 +560,62 @@ func (_c *Service_UpdateRuleSchedule_Call) RunAndReturn(run func(ctx context.Con
return _c
}
// UpdateRuleTags provides a mock function for the type Service
func (_mock *Service) UpdateRuleTags(ctx context.Context, session authn.Session, channel re.Rule) (re.Rule, error) {
ret := _mock.Called(ctx, session, channel)
if len(ret) == 0 {
panic("no return value specified for UpdateRuleTags")
}
var r0 re.Rule
var r1 error
if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) (re.Rule, error)); ok {
return returnFunc(ctx, session, channel)
}
if returnFunc, ok := ret.Get(0).(func(context.Context, authn.Session, re.Rule) re.Rule); ok {
r0 = returnFunc(ctx, session, channel)
} else {
r0 = ret.Get(0).(re.Rule)
}
if returnFunc, ok := ret.Get(1).(func(context.Context, authn.Session, re.Rule) error); ok {
r1 = returnFunc(ctx, session, channel)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Service_UpdateRuleTags_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateRuleTags'
type Service_UpdateRuleTags_Call struct {
*mock.Call
}
// UpdateRuleTags is a helper method to define mock.On call
// - ctx
// - session
// - channel
func (_e *Service_Expecter) UpdateRuleTags(ctx interface{}, session interface{}, channel interface{}) *Service_UpdateRuleTags_Call {
return &Service_UpdateRuleTags_Call{Call: _e.mock.On("UpdateRuleTags", ctx, session, channel)}
}
func (_c *Service_UpdateRuleTags_Call) Run(run func(ctx context.Context, session authn.Session, channel re.Rule)) *Service_UpdateRuleTags_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(authn.Session), args[2].(re.Rule))
})
return _c
}
func (_c *Service_UpdateRuleTags_Call) Return(rule re.Rule, err error) *Service_UpdateRuleTags_Call {
_c.Call.Return(rule, err)
return _c
}
func (_c *Service_UpdateRuleTags_Call) RunAndReturn(run func(ctx context.Context, session authn.Session, channel re.Rule) (re.Rule, error)) *Service_UpdateRuleTags_Call {
_c.Call.Return(run)
return _c
}
// ViewRule provides a mock function for the type Service
func (_mock *Service) ViewRule(ctx context.Context, session authn.Session, id string) (re.Rule, error) {
ret := _mock.Called(ctx, session, id)
+9
View File
@@ -43,6 +43,15 @@ func Migration() *migrate.MemoryMigrationSource {
`DROP TABLE IF EXISTS rules`,
},
},
{
Id: "rules_02",
Up: []string{
`ALTER TABLE rules ADD COLUMN tags TEXT[];`,
},
Down: []string{
`ALTER TABLE rules DROP COLUMN tags;`,
},
},
},
}
}
+34 -61
View File
@@ -26,11 +26,11 @@ func NewRepository(db postgres.Database) re.Repository {
func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) {
q := `
INSERT INTO rules (id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
INSERT INTO rules (id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status)
VALUES (:id, :name, :domain_id, :metadata, :input_channel, :input_topic, :logic_type, :logic_output, :logic_value,
VALUES (:id, :name, :domain_id, :tags, :metadata, :input_channel, :input_topic, :logic_type, :logic_output, :logic_value,
:output_channel, :output_topic, :start_datetime, :time, :recurring, :recurring_period, :created_at, :created_by, :updated_at, :updated_by, :status)
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
dbr, err := ruleToDb(r)
@@ -60,7 +60,7 @@ func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule
func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rule, error) {
q := `
SELECT id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
SELECT id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules
WHERE id = $1;
@@ -85,35 +85,10 @@ func (repo *PostgresRepository) UpdateRuleStatus(ctx context.Context, r re.Rule)
q := `UPDATE rules
SET status = :status, updated_at = :updated_at, updated_by = :updated_by
WHERE id = :id
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;`
dbr, err := ruleToDb(r)
if err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
row, err := repo.DB.NamedQueryContext(ctx, q, dbr)
if err != nil {
return re.Rule{}, postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer row.Close()
var res dbRule
if row.Next() {
if err := row.StructScan(&res); err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
rule, err := dbToRule(res)
if err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
return rule, nil
}
return re.Rule{}, repoerr.ErrNotFound
return repo.update(ctx, r, q)
}
func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) {
@@ -150,32 +125,21 @@ func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.R
q := fmt.Sprintf(`
UPDATE rules
SET %s updated_at = :updated_at, updated_by = :updated_by WHERE id = :id
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`, upq)
dbr, err := ruleToDb(r)
if err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
row, err := repo.DB.NamedQueryContext(ctx, q, dbr)
if err != nil {
return re.Rule{}, postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
defer row.Close()
return repo.update(ctx, r, q)
}
var dbRule dbRule
if row.Next() {
if err := row.StructScan(&dbRule); err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
}
rule, err := dbToRule(dbRule)
if err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
func (repo *PostgresRepository) UpdateRuleTags(ctx context.Context, r re.Rule) (re.Rule, error) {
q := `UPDATE rules SET tags = :tags, updated_at = :updated_at, updated_by = :updated_by
WHERE id = :id AND status = :status
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;`
r.Status = re.EnabledStatus
return rule, nil
return repo.update(ctx, r, q)
}
func (repo *PostgresRepository) UpdateRuleSchedule(ctx context.Context, r re.Rule) (re.Rule, error) {
@@ -183,14 +147,19 @@ func (repo *PostgresRepository) UpdateRuleSchedule(ctx context.Context, r re.Rul
UPDATE rules
SET start_datetime = :start_datetime, time = :time, recurring = :recurring,
recurring_period = :recurring_period, updated_at = :updated_at, updated_by = :updated_by WHERE id = :id
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
return repo.update(ctx, r, q)
}
func (repo *PostgresRepository) update(ctx context.Context, r re.Rule, query string) (re.Rule, error) {
dbr, err := ruleToDb(r)
if err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
row, err := repo.DB.NamedQueryContext(ctx, q, dbr)
row, err := repo.DB.NamedQueryContext(ctx, query, dbr)
if err != nil {
return re.Rule{}, postgres.HandleError(repoerr.ErrUpdateEntity, err)
}
@@ -201,13 +170,14 @@ func (repo *PostgresRepository) UpdateRuleSchedule(ctx context.Context, r re.Rul
if err := row.StructScan(&dbRule); err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
}
rule, err := dbToRule(dbRule)
if err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
rule, err := dbToRule(dbRule)
if err != nil {
return re.Rule{}, errors.Wrap(repoerr.ErrUpdateEntity, err)
}
return rule, nil
}
return rule, nil
return re.Rule{}, repoerr.ErrNotFound
}
func (repo *PostgresRepository) RemoveRule(ctx context.Context, id string) error {
@@ -242,7 +212,7 @@ func (repo *PostgresRepository) ListRules(ctx context.Context, pm re.PageMeta) (
}
pq := pageRulesQuery(pm)
q := fmt.Sprintf(`
SELECT id, name, domain_id, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
SELECT id, name, domain_id, tags, input_channel, input_topic, logic_type, logic_output, logic_value, output_channel,
output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status
FROM rules r %s %s;
`, pq, pgData)
@@ -285,7 +255,7 @@ func (repo *PostgresRepository) UpdateRuleDue(ctx context.Context, id string, du
q := `
UPDATE rules
SET time = :time, updated_at = :updated_at WHERE id = :id
RETURNING id, name, domain_id, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
RETURNING id, name, domain_id, tags, metadata, input_channel, input_topic, logic_type, logic_output, logic_value,
output_channel, output_topic, start_datetime, time, recurring, recurring_period, created_at, created_by, updated_at, updated_by, status;
`
dbr := dbRule{
@@ -333,6 +303,9 @@ func pageRulesQuery(pm re.PageMeta) string {
if pm.Domain != "" {
query = append(query, "r.domain_id = :domain_id")
}
if pm.Tag != "" {
query = append(query, "EXISTS (SELECT 1 FROM unnest(tags) AS tag WHERE tag ILIKE '%' || :tag || '%')")
}
if pm.ScheduledBefore != nil {
query = append(query, "r.time < :scheduled_before")
}
+12
View File
@@ -11,6 +11,7 @@ import (
"github.com/absmach/magistrala/pkg/schedule"
"github.com/absmach/magistrala/re"
"github.com/absmach/supermq/pkg/errors"
"github.com/jackc/pgtype"
"github.com/lib/pq"
)
@@ -19,6 +20,7 @@ type dbRule struct {
ID string `db:"id"`
Name string `db:"name"`
DomainID string `db:"domain_id"`
Tags pgtype.TextArray `db:"tags,omitempty"`
Metadata []byte `db:"metadata,omitempty"`
InputChannel string `db:"input_channel"`
InputTopic sql.NullString `db:"input_topic"`
@@ -59,10 +61,15 @@ func ruleToDb(r re.Rule) (dbRule, error) {
if !r.Schedule.Time.IsZero() {
t.Valid = true
}
var tags pgtype.TextArray
if err := tags.Set(r.Tags); err != nil {
return dbRule{}, err
}
return dbRule{
ID: r.ID,
Name: r.Name,
DomainID: r.DomainID,
Tags: tags,
Metadata: metadata,
InputChannel: r.InputChannel,
InputTopic: toNullString(r.InputTopic),
@@ -94,10 +101,15 @@ func dbToRule(dto dbRule) (re.Rule, error) {
for _, v := range dto.LogicOutputs {
lo = append(lo, re.ScriptOutput(v))
}
var tags []string
for _, e := range dto.Tags.Elements {
tags = append(tags, e.String)
}
return re.Rule{
ID: dto.ID,
Name: dto.Name,
DomainID: dto.DomainID,
Tags: tags,
Metadata: metadata,
InputChannel: dto.InputChannel,
InputTopic: fromNullString(dto.InputTopic),
+57
View File
@@ -4,12 +4,15 @@
package re
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/absmach/magistrala/pkg/schedule"
"github.com/absmach/supermq/pkg/authn"
"github.com/absmach/supermq/pkg/errors"
"github.com/absmach/supermq/pkg/messaging"
)
const (
@@ -87,6 +90,7 @@ type Rule struct {
Name string `json:"name"`
DomainID string `json:"domain"`
Metadata Metadata `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
InputChannel string `json:"input_channel"`
InputTopic string `json:"input_topic"`
Logic Script `json:"logic"`
@@ -99,3 +103,56 @@ type Rule struct {
UpdatedAt time.Time `json:"updated_at"`
UpdatedBy string `json:"updated_by"`
}
// PageMeta contains page metadata that helps navigation.
type PageMeta struct {
Total uint64 `json:"total" db:"total"`
Offset uint64 `json:"offset" db:"offset"`
Limit uint64 `json:"limit" db:"limit"`
Dir string `json:"dir" db:"dir"`
Name string `json:"name" db:"name"`
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
InputTopic *string `json:"input_topic,omitempty" db:"input_topic"`
Scheduled *bool `json:"scheduled,omitempty"`
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
Status Status `json:"status,omitempty" db:"status"`
Domain string `json:"domain_id,omitempty" db:"domain_id"`
Tag string `json:"tag,omitempty"`
ScheduledBefore *time.Time `json:"scheduled_before,omitempty" db:"scheduled_before"` // Filter rules scheduled before this time
ScheduledAfter *time.Time `json:"scheduled_after,omitempty" db:"scheduled_after"` // Filter rules scheduled after this time
Recurring *schedule.Recurring `json:"recurring,omitempty" db:"recurring"` // Filter by recurring type
}
type Page struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Total uint64 `json:"total"`
Rules []Rule `json:"rules"`
}
type Service interface {
messaging.MessageHandler
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error)
UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
UpdateRuleTags(ctx context.Context, session authn.Session, r Rule) (Rule, error)
UpdateRuleSchedule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error)
RemoveRule(ctx context.Context, session authn.Session, id string) error
EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
StartScheduler(ctx context.Context) error
}
type Repository interface {
AddRule(ctx context.Context, r Rule) (Rule, error)
ViewRule(ctx context.Context, id string) (Rule, error)
UpdateRule(ctx context.Context, r Rule) (Rule, error)
UpdateRuleTags(ctx context.Context, r Rule) (Rule, error)
UpdateRuleSchedule(ctx context.Context, r Rule) (Rule, error)
RemoveRule(ctx context.Context, id string) error
UpdateRuleStatus(ctx context.Context, r Rule) (Rule, error)
ListRules(ctx context.Context, pm PageMeta) (Page, error)
UpdateRuleDue(ctx context.Context, id string, due time.Time) (Rule, error)
}
+11 -51
View File
@@ -10,7 +10,6 @@ import (
grpcReadersV1 "github.com/absmach/magistrala/api/grpc/readers/v1"
"github.com/absmach/magistrala/pkg/emailer"
pkglog "github.com/absmach/magistrala/pkg/logger"
"github.com/absmach/magistrala/pkg/schedule"
"github.com/absmach/magistrala/pkg/ticker"
"github.com/absmach/supermq"
"github.com/absmach/supermq/pkg/authn"
@@ -19,56 +18,6 @@ import (
"github.com/absmach/supermq/pkg/messaging"
)
type Repository interface {
AddRule(ctx context.Context, r Rule) (Rule, error)
ViewRule(ctx context.Context, id string) (Rule, error)
UpdateRule(ctx context.Context, r Rule) (Rule, error)
UpdateRuleSchedule(ctx context.Context, r Rule) (Rule, error)
RemoveRule(ctx context.Context, id string) error
UpdateRuleStatus(ctx context.Context, r Rule) (Rule, error)
ListRules(ctx context.Context, pm PageMeta) (Page, error)
UpdateRuleDue(ctx context.Context, id string, due time.Time) (Rule, error)
}
// PageMeta contains page metadata that helps navigation.
type PageMeta struct {
Total uint64 `json:"total" db:"total"`
Offset uint64 `json:"offset" db:"offset"`
Limit uint64 `json:"limit" db:"limit"`
Dir string `json:"dir" db:"dir"`
Name string `json:"name" db:"name"`
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
InputTopic *string `json:"input_topic,omitempty" db:"input_topic"`
Scheduled *bool `json:"scheduled,omitempty"`
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
Status Status `json:"status,omitempty" db:"status"`
Domain string `json:"domain_id,omitempty" db:"domain_id"`
ScheduledBefore *time.Time `json:"scheduled_before,omitempty" db:"scheduled_before"` // Filter rules scheduled before this time
ScheduledAfter *time.Time `json:"scheduled_after,omitempty" db:"scheduled_after"` // Filter rules scheduled after this time
Recurring *schedule.Recurring `json:"recurring,omitempty" db:"recurring"` // Filter by recurring type
}
type Page struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Total uint64 `json:"total"`
Rules []Rule `json:"rules"`
}
type Service interface {
messaging.MessageHandler
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error)
UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
UpdateRuleSchedule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error)
RemoveRule(ctx context.Context, session authn.Session, id string) error
EnableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
DisableRule(ctx context.Context, session authn.Session, id string) (Rule, error)
StartScheduler(ctx context.Context) error
}
type re struct {
repo Repository
runInfo chan pkglog.RunInfo
@@ -140,6 +89,17 @@ func (re *re) UpdateRule(ctx context.Context, session authn.Session, r Rule) (Ru
return rule, nil
}
func (re *re) UpdateRuleTags(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
r.UpdatedAt = time.Now().UTC()
r.UpdatedBy = session.UserID
rule, err := re.repo.UpdateRuleTags(ctx, r)
if err != nil {
return Rule{}, errors.Wrap(svcerr.ErrUpdateEntity, err)
}
return rule, nil
}
func (re *re) UpdateRuleSchedule(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
r.UpdatedAt = time.Now().UTC()
r.UpdatedBy = session.UserID
+57
View File
@@ -33,6 +33,7 @@ var (
domainID = testsutil.GenerateUUID(&testing.T{})
ruleName = namegen.Generate()
ruleID = testsutil.GenerateUUID(&testing.T{})
Tags = []string{"tag1", "tag2"}
inputChannel = "test.channel"
schedule = pkgSch.Schedule{
StartDateTime: time.Now().Add(-time.Hour),
@@ -270,6 +271,62 @@ func TestUpdateRule(t *testing.T) {
}
}
func TestUpdateRuleTags(t *testing.T) {
svc, repo, _, _ := newService(t, make(chan pkglog.RunInfo))
cases := []struct {
desc string
session authn.Session
updateReq re.Rule
repoResp re.Rule
repoErr error
err error
}{
{
desc: "update rule tags successfully",
session: authn.Session{
UserID: userID,
DomainID: domainID,
},
updateReq: re.Rule{
ID: testsutil.GenerateUUID(t),
Tags: []string{"tag1", "tag2"},
},
repoResp: re.Rule{
ID: testsutil.GenerateUUID(t),
Tags: []string{"tag1", "tag2"},
},
},
{
desc: "update rule tags with repo error",
session: authn.Session{
UserID: userID,
DomainID: domainID,
},
updateReq: re.Rule{
ID: testsutil.GenerateUUID(t),
Tags: []string{"tag1", "tag2"},
},
repoErr: repoerr.ErrNotFound,
err: svcerr.ErrNotFound,
},
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
repoCall := repo.On("UpdateRuleTags", context.Background(), mock.Anything).Return(tc.repoResp, tc.repoErr)
got, err := svc.UpdateRuleTags(context.Background(), tc.session, tc.updateReq)
assert.True(t, errors.Contains(err, tc.err), fmt.Sprintf("expected error %v to contain %v", err, tc.err))
if err == nil {
assert.Equal(t, tc.repoResp, got)
ok := repo.AssertCalled(t, "UpdateRuleTags", context.Background(), mock.Anything)
assert.True(t, ok, fmt.Sprintf("UpdateTags was not called on %s", tc.desc))
}
repoCall.Unset()
})
}
}
func TestListRules(t *testing.T) {
svc, repo, _, _ := newService(t, make(chan pkglog.RunInfo))
numRules := 50