mirror of
https://github.com/absmach/supermq.git
synced 2026-06-23 04:00:27 +00:00
MG-13 - Magistrala Rules engine (#16)
* Add initial Rules Engine model Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * WIP - Add API layer Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Add async consumer Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Use Named queries and single topics Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Update rules listing Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Enable consumers with no transformer Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Move RE to addons Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Fix HTTP server host Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Remove unused code Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> * Remove cache for the time being Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com> --------- Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
This commit is contained in:
@@ -4,7 +4,7 @@
|
||||
MG_DOCKER_IMAGE_NAME_PREFIX ?= ghcr.io/absmach/magistrala
|
||||
BUILD_DIR = build
|
||||
SERVICES = auth users things http coap ws postgres-writer postgres-reader timescale-writer \
|
||||
timescale-reader cli bootstrap mqtt provision certs invitations journal
|
||||
timescale-reader cli bootstrap mqtt provision certs invitations journal re
|
||||
TEST_API_SERVICES = journal auth bootstrap certs http invitations notifiers provision readers things users
|
||||
TEST_API = $(addprefix test_api_,$(TEST_API_SERVICES))
|
||||
DOCKERS = $(addprefix docker_,$(SERVICES))
|
||||
|
||||
+210
@@ -0,0 +1,210 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package main contains rule engine main function to start the service.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"log/slog"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
chclient "github.com/absmach/callhome/pkg/client"
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/consumers"
|
||||
redisclient "github.com/absmach/magistrala/internal/clients/redis"
|
||||
mglog "github.com/absmach/magistrala/logger"
|
||||
authnsvc "github.com/absmach/magistrala/pkg/authn/authsvc"
|
||||
mgauthz "github.com/absmach/magistrala/pkg/authz"
|
||||
authzsvc "github.com/absmach/magistrala/pkg/authz/authsvc"
|
||||
"github.com/absmach/magistrala/pkg/grpcclient"
|
||||
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
|
||||
"github.com/absmach/magistrala/pkg/messaging/brokers"
|
||||
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
|
||||
pgclient "github.com/absmach/magistrala/pkg/postgres"
|
||||
"github.com/absmach/magistrala/pkg/server"
|
||||
httpserver "github.com/absmach/magistrala/pkg/server/http"
|
||||
"github.com/absmach/magistrala/pkg/uuid"
|
||||
"github.com/absmach/magistrala/re"
|
||||
httpapi "github.com/absmach/magistrala/re/api"
|
||||
repg "github.com/absmach/magistrala/re/postgres"
|
||||
"github.com/caarlos0/env/v11"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
svcName = "rules_engine"
|
||||
envPrefixDB = "MG_RE_DB_"
|
||||
envPrefixHTTP = "MG_RE_HTTP_"
|
||||
envPrefixAuth = "MG_AUTH_GRPC_"
|
||||
defDB = "r"
|
||||
defSvcHTTPPort = "9008"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
|
||||
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
|
||||
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
|
||||
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
|
||||
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
|
||||
CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
|
||||
CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"`
|
||||
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
|
||||
ConfigPath string `env:"MG_RE_CONFIG_PATH" envDefault:"/config.toml"`
|
||||
BrokerURL string `env:"MG_MESSAGE_BROKER_URL" envDefault:"nats://localhost:4222"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
// Create new rule engine configuration
|
||||
cfg := config{}
|
||||
if err := env.Parse(&cfg); err != nil {
|
||||
log.Fatalf("failed to load %s configuration : %s", svcName, err)
|
||||
}
|
||||
|
||||
var logger *slog.Logger
|
||||
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to init logger: %s", err.Error())
|
||||
}
|
||||
|
||||
var exitCode int
|
||||
defer mglog.ExitWithError(&exitCode)
|
||||
|
||||
if cfg.InstanceID == "" {
|
||||
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Create new database for rule engine.
|
||||
dbConfig := pgclient.Config{Name: defDB}
|
||||
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
|
||||
logger.Error(err.Error())
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
db, err := pgclient.Setup(dbConfig, *repg.Migration())
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := tp.Shutdown(ctx); err != nil {
|
||||
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
|
||||
}
|
||||
}()
|
||||
tracer := tp.Tracer(svcName)
|
||||
|
||||
pubSub, err := brokers.NewPubSub(ctx, cfg.BrokerURL, logger)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
defer pubSub.Close()
|
||||
|
||||
httpServerConfig := server.Config{Port: defSvcHTTPPort}
|
||||
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
pubSub = brokerstracing.NewPubSub(httpServerConfig, tracer, pubSub)
|
||||
|
||||
// Setup new redis cache client
|
||||
cacheclient, err := redisclient.Connect(cfg.CacheURL)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
defer cacheclient.Close()
|
||||
|
||||
grpcCfg := grpcclient.Config{}
|
||||
if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
authn, authnClient, err := authnsvc.NewAuthentication(ctx, grpcCfg)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
defer authnClient.Close()
|
||||
logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())
|
||||
|
||||
authz, authzClient, err := authzsvc.NewAuthorization(ctx, grpcCfg)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
defer authzClient.Close()
|
||||
logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())
|
||||
|
||||
svc, err := newService(ctx, db, dbConfig, authz, cacheclient, cfg.CacheKeyDuration, cfg.ESURL, tracer, logger)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create services: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
|
||||
if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil {
|
||||
logger.Error(fmt.Sprintf("failed to create Rule Engine: %s", err))
|
||||
exitCode = 1
|
||||
return
|
||||
}
|
||||
httpSvc := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, authn, logger, cfg.InstanceID), logger)
|
||||
|
||||
if cfg.SendTelemetry {
|
||||
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
|
||||
go chc.CallHome(ctx)
|
||||
}
|
||||
|
||||
// Start all servers
|
||||
g.Go(func() error {
|
||||
return httpSvc.Start()
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
return server.StopSignalHandler(ctx, cancel, logger, svcName, httpSvc)
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
|
||||
}
|
||||
}
|
||||
|
||||
func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger *slog.Logger) (re.Service, error) {
|
||||
database := pgclient.NewDatabase(db, dbConfig, tracer)
|
||||
repo := repg.NewRepository(database)
|
||||
idp := uuid.New()
|
||||
|
||||
// csvc = authzmw.AuthorizationMiddleware(csvc, authz)
|
||||
csvc := re.NewService(repo, idp, nil)
|
||||
|
||||
return csvc, nil
|
||||
}
|
||||
@@ -125,10 +125,7 @@ func loadConfig(configPath string) (config, error) {
|
||||
SubscriberCfg: subscriberConfig{
|
||||
Subjects: []string{brokers.SubjectAllChannels},
|
||||
},
|
||||
TransformerCfg: transformerConfig{
|
||||
Format: defFormat,
|
||||
ContentType: defContentType,
|
||||
},
|
||||
TransformerCfg: transformerConfig{},
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(configPath)
|
||||
@@ -152,8 +149,7 @@ func makeTransformer(cfg transformerConfig, logger *slog.Logger) transformers.Tr
|
||||
logger.Info("Using JSON transformer")
|
||||
return json.New(cfg.TimeFields)
|
||||
default:
|
||||
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format))
|
||||
os.Exit(1)
|
||||
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s; continuing without transformer", cfg.Format))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
+17
@@ -246,6 +246,23 @@ MG_THINGS_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/things-grpc-client.crt}
|
||||
MG_THINGS_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/things-grpc-client.key}
|
||||
MG_THINGS_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}
|
||||
|
||||
### RE
|
||||
MG_RE_LOG_LEVEL=debug
|
||||
MG_RE_HTTP_HOST=re
|
||||
MG_RE_HTTP_PORT=9008
|
||||
MG_RE_HTTP_SERVER_CERT=
|
||||
MG_RE_HTTP_SERVER_KEY=
|
||||
MG_RE_DB_HOST=re-db
|
||||
MG_RE_DB_PORT=5432
|
||||
MG_RE_DB_USER=magistrala
|
||||
MG_RE_DB_PASS=magistrala
|
||||
MG_RE_DB_NAME=rule_engine
|
||||
MG_RE_DB_SSL_MODE=disable
|
||||
MG_RE_DB_SSL_CERT=
|
||||
MG_RE_DB_SSL_KEY=
|
||||
MG_RE_DB_SSL_ROOT_CERT=
|
||||
MG_RE_INSTANCE_ID=
|
||||
|
||||
### HTTP
|
||||
MG_HTTP_ADAPTER_LOG_LEVEL=debug
|
||||
MG_HTTP_ADAPTER_HOST=http-adapter
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
# Copyright (c) Abstract Machines
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
# To listen all messsage broker subjects use default value "channels.>".
|
||||
# To subscribe to specific subjects use values starting by "channels." and
|
||||
# followed by a subtopic (e.g ["channels.<channel_id>.sub.topic.x", ...]).
|
||||
[subscriber]
|
||||
subjects = ["channels.>"]
|
||||
@@ -0,0 +1,92 @@
|
||||
# Copyright (c) Abstract Machines
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
# This docker-compose file contains optional Rule Egine service for Magistrala platform.
|
||||
# Since these are optional, this file is dependent of docker-compose file
|
||||
# from <project_root>/docker. In order to run these services, execute command:
|
||||
# docker compose -f docker/docker-compose.yml -f docker/addons/re/docker-compose.yml up
|
||||
# from project root. PostgreSQL port is mapped, so you can use various tools for database
|
||||
# inspection and data visualization.
|
||||
|
||||
networks:
|
||||
magistrala-base-net:
|
||||
driver: bridge
|
||||
|
||||
volumes:
|
||||
magistrala-re-db-volume:
|
||||
|
||||
services:
|
||||
re-db:
|
||||
image: postgres:16.2-alpine
|
||||
container_name: magistrala-re-db
|
||||
restart: on-failure
|
||||
command: postgres -c "max_connections=${MG_POSTGRES_MAX_CONNECTIONS}"
|
||||
environment:
|
||||
POSTGRES_USER: ${MG_RE_DB_USER}
|
||||
POSTGRES_PASSWORD: ${MG_RE_DB_PASS}
|
||||
POSTGRES_DB: ${MG_RE_DB_NAME}
|
||||
MG_POSTGRES_MAX_CONNECTIONS: ${MG_POSTGRES_MAX_CONNECTIONS}
|
||||
ports:
|
||||
- 6008:5432
|
||||
networks:
|
||||
- magistrala-base-net
|
||||
volumes:
|
||||
- magistrala-re-db-volume:/var/lib/postgresql/data
|
||||
|
||||
re:
|
||||
image: ghcr.io/absmach/magistrala/re:${MG_RELEASE_TAG}
|
||||
container_name: magistrala-re
|
||||
depends_on:
|
||||
- re-db
|
||||
restart: on-failure
|
||||
environment:
|
||||
MG_RE_LOG_LEVEL: ${MG_RE_LOG_LEVEL}
|
||||
MG_RE_HTTP_PORT: ${MG_RE_HTTP_PORT}
|
||||
MG_RE_HTTP_HOST: ${MG_RE_HTTP_HOST}
|
||||
MG_RE_HTTP_SERVER_CERT: ${MG_RE_HTTP_SERVER_CERT}
|
||||
MG_RE_HTTP_SERVER_KEY: ${MG_RE_HTTP_SERVER_KEY}
|
||||
MG_RE_DB_HOST: ${MG_RE_DB_HOST}
|
||||
MG_RE_DB_PORT: ${MG_RE_DB_PORT}
|
||||
MG_RE_DB_USER: ${MG_RE_DB_USER}
|
||||
MG_RE_DB_PASS: ${MG_RE_DB_PASS}
|
||||
MG_RE_DB_NAME: ${MG_RE_DB_NAME}
|
||||
MG_RE_DB_SSL_MODE: ${MG_RE_DB_SSL_MODE}
|
||||
MG_RE_DB_SSL_CERT: ${MG_RE_DB_SSL_CERT}
|
||||
MG_RE_DB_SSL_KEY: ${MG_RE_DB_SSL_KEY}
|
||||
MG_RE_DB_SSL_ROOT_CERT: ${MG_RE_DB_SSL_ROOT_CERT}
|
||||
MG_MESSAGE_BROKER_URL: ${MG_MESSAGE_BROKER_URL}
|
||||
MG_ES_URL: ${MG_ES_URL}
|
||||
MG_JAEGER_URL: ${MG_JAEGER_URL}
|
||||
MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO}
|
||||
MG_SEND_TELEMETRY: ${MG_SEND_TELEMETRY}
|
||||
MG_AUTH_GRPC_URL: ${MG_AUTH_GRPC_URL}
|
||||
MG_AUTH_GRPC_TIMEOUT: ${MG_AUTH_GRPC_TIMEOUT}
|
||||
MG_AUTH_GRPC_CLIENT_CERT: ${MG_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt}
|
||||
MG_AUTH_GRPC_CLIENT_KEY: ${MG_AUTH_GRPC_CLIENT_KEY:+/auth-grpc-client.key}
|
||||
MG_AUTH_GRPC_SERVER_CA_CERTS: ${MG_AUTH_GRPC_SERVER_CA_CERTS:+/auth-grpc-server-ca.crt}
|
||||
MG_SPICEDB_PRE_SHARED_KEY: ${MG_SPICEDB_PRE_SHARED_KEY}
|
||||
MG_SPICEDB_HOST: ${MG_SPICEDB_HOST}
|
||||
MG_SPICEDB_PORT: ${MG_SPICEDB_PORT}
|
||||
MG_RE_INSTANCE_ID: ${MG_RE_INSTANCE_ID}
|
||||
ports:
|
||||
- ${MG_RE_HTTP_PORT}:${MG_RE_HTTP_PORT}
|
||||
networks:
|
||||
- magistrala-base-net
|
||||
volumes:
|
||||
# Auth gRPC client certificates
|
||||
- type: bind
|
||||
source: ${MG_AUTH_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
|
||||
target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_CERT:+.crt}
|
||||
bind:
|
||||
create_host_path: true
|
||||
- type: bind
|
||||
source: ${MG_AUTH_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
|
||||
target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_KEY:+.key}
|
||||
bind:
|
||||
create_host_path: true
|
||||
- type: bind
|
||||
source: ${MG_AUTH_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca}
|
||||
target: /auth-grpc-server-ca${MG_AUTH_GRPC_SERVER_CA_CERTS:+.crt}
|
||||
bind:
|
||||
create_host_path: true
|
||||
- ./config.toml:/config.toml
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
networks:
|
||||
magistrala-base-net:
|
||||
driver: bridge
|
||||
|
||||
volumes:
|
||||
magistrala-timescale-writer-volume:
|
||||
|
||||
@@ -16,6 +16,7 @@ require (
|
||||
github.com/cenkalti/backoff/v4 v4.3.0
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.0
|
||||
github.com/fatih/color v1.18.0
|
||||
github.com/go-chi/chi v1.5.5
|
||||
github.com/go-chi/chi/v5 v5.1.0
|
||||
github.com/go-kit/kit v0.13.0
|
||||
github.com/gofrs/uuid/v5 v5.3.0
|
||||
@@ -43,6 +44,7 @@ require (
|
||||
github.com/spf13/cobra v1.8.1
|
||||
github.com/spf13/viper v1.19.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/yuin/gopher-lua v1.1.1
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0
|
||||
go.opentelemetry.io/otel v1.33.0
|
||||
|
||||
@@ -440,6 +440,8 @@ github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJu
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
|
||||
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
# Magistrala Rule Engine
|
||||
|
||||
|
||||
[doc]: https://docs.magistrala.abstractmachines.fr
|
||||
[compose]: ../docker/docker-compose.yml
|
||||
@@ -0,0 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package api contains API-related concerns: endpoint definitions, middlewares
|
||||
// and all resource representations.
|
||||
package api
|
||||
@@ -0,0 +1,97 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/absmach/magistrala/internal/api"
|
||||
"github.com/absmach/magistrala/pkg/authn"
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
"github.com/absmach/magistrala/re"
|
||||
"github.com/go-kit/kit/endpoint"
|
||||
)
|
||||
|
||||
func addRuleEndpoint(s re.Service) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
session, ok := ctx.Value(api.SessionKey).(authn.Session)
|
||||
if !ok {
|
||||
return nil, svcerr.ErrAuthorization
|
||||
}
|
||||
|
||||
req := request.(addRuleReq)
|
||||
rule, err := s.AddRule(ctx, session, req.Rule)
|
||||
if err != nil {
|
||||
return addRuleRes{}, err
|
||||
}
|
||||
return addRuleRes{Rule: rule, created: true}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func viewRuleEndpoint(s re.Service) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
session, ok := ctx.Value(api.SessionKey).(authn.Session)
|
||||
if !ok {
|
||||
return nil, svcerr.ErrAuthorization
|
||||
}
|
||||
|
||||
req := request.(viewRuleReq)
|
||||
rule, err := s.ViewRule(ctx, session, req.id)
|
||||
if err != nil {
|
||||
return viewRuleRes{}, err
|
||||
}
|
||||
return viewRuleRes{Rule: rule}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func updateRuleEndpoint(s re.Service) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
session, ok := ctx.Value(api.SessionKey).(authn.Session)
|
||||
if !ok {
|
||||
return nil, svcerr.ErrAuthorization
|
||||
}
|
||||
|
||||
req := request.(updateRuleReq)
|
||||
rule, err := s.UpdateRule(ctx, session, req.Rule)
|
||||
if err != nil {
|
||||
return updateRuleRes{}, err
|
||||
}
|
||||
return updateRuleRes{Rule: rule}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func listRulesEndpoint(s re.Service) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
session, ok := ctx.Value(api.SessionKey).(authn.Session)
|
||||
if !ok {
|
||||
return nil, svcerr.ErrAuthorization
|
||||
}
|
||||
|
||||
req := request.(listRulesReq)
|
||||
page, err := s.ListRules(ctx, session, req.PageMeta)
|
||||
if err != nil {
|
||||
return rulesPageRes{}, nil
|
||||
}
|
||||
ret := rulesPageRes{
|
||||
Rules: page.Rules,
|
||||
}
|
||||
return ret, nil
|
||||
}
|
||||
}
|
||||
|
||||
func upadateRuleStatusEndpoint(s re.Service) endpoint.Endpoint {
|
||||
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
||||
session, ok := ctx.Value(api.SessionKey).(authn.Session)
|
||||
if !ok {
|
||||
return nil, svcerr.ErrAuthorization
|
||||
}
|
||||
|
||||
req := request.(changeRuleStatusReq)
|
||||
err := s.RemoveRule(ctx, session, req.id)
|
||||
if err != nil {
|
||||
return changeRoleStatusRes{false}, err
|
||||
}
|
||||
return changeRoleStatusRes{true}, nil
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/absmach/magistrala/internal/api"
|
||||
"github.com/absmach/magistrala/pkg/apiutil"
|
||||
"github.com/absmach/magistrala/re"
|
||||
)
|
||||
|
||||
const maxLimitSize = 1000
|
||||
|
||||
type addRuleReq struct {
|
||||
re.Rule
|
||||
}
|
||||
|
||||
func (req addRuleReq) validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type viewRuleReq struct {
|
||||
id string
|
||||
}
|
||||
|
||||
func (req viewRuleReq) validate() error {
|
||||
if req.id == "" {
|
||||
return apiutil.ErrMissingID
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type listRulesReq struct {
|
||||
re.PageMeta
|
||||
}
|
||||
|
||||
func (req listRulesReq) validate() error {
|
||||
if req.Limit > maxLimitSize {
|
||||
return apiutil.ErrLimitSize
|
||||
}
|
||||
if req.Dir != "" && (req.Dir != api.AscDir && req.Dir != api.DescDir) {
|
||||
return apiutil.ErrInvalidDirection
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type updateRuleReq struct {
|
||||
Rule re.Rule `json:",inline"`
|
||||
}
|
||||
|
||||
func (req updateRuleReq) validate() error {
|
||||
if req.Rule.ID == "" {
|
||||
return apiutil.ErrMissingID
|
||||
}
|
||||
if len(req.Rule.Logic.Value) == 0 {
|
||||
return apiutil.ErrEmptyList
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type changeRuleStatusReq struct {
|
||||
id string
|
||||
status re.Status
|
||||
}
|
||||
|
||||
func (req changeRuleStatusReq) validate() error {
|
||||
if req.id == "" {
|
||||
return apiutil.ErrMissingID
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,139 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/re"
|
||||
)
|
||||
|
||||
var (
|
||||
_ magistrala.Response = (*viewRuleRes)(nil)
|
||||
_ magistrala.Response = (*addRuleRes)(nil)
|
||||
_ magistrala.Response = (*changeRuleStatusRes)(nil)
|
||||
_ magistrala.Response = (*rulesPageRes)(nil)
|
||||
_ magistrala.Response = (*updateRuleRes)(nil)
|
||||
_ magistrala.Response = (*changeRoleStatusRes)(nil)
|
||||
)
|
||||
|
||||
type pageRes struct {
|
||||
Limit uint64 `json:"limit,omitempty"`
|
||||
Offset uint64 `json:"offset"`
|
||||
Total uint64 `json:"total"`
|
||||
}
|
||||
|
||||
type addRuleRes struct {
|
||||
re.Rule
|
||||
created bool
|
||||
}
|
||||
|
||||
func (res addRuleRes) Code() int {
|
||||
if res.created {
|
||||
return http.StatusCreated
|
||||
}
|
||||
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func (res addRuleRes) Headers() map[string]string {
|
||||
if res.created {
|
||||
return map[string]string{
|
||||
"Location": fmt.Sprintf("/rules/%s", res.ID),
|
||||
}
|
||||
}
|
||||
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res addRuleRes) Empty() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type updateRuleRes struct {
|
||||
re.Rule `json:",inline"`
|
||||
}
|
||||
|
||||
func (res updateRuleRes) Code() int {
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func (res updateRuleRes) Headers() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res updateRuleRes) Empty() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type viewRuleRes struct {
|
||||
re.Rule `json:",inline"`
|
||||
}
|
||||
|
||||
func (res viewRuleRes) Code() int {
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func (res viewRuleRes) Headers() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res viewRuleRes) Empty() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type rulesPageRes struct {
|
||||
pageRes
|
||||
Rules []re.Rule `json:"rules"`
|
||||
}
|
||||
|
||||
func (res rulesPageRes) Code() int {
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func (res rulesPageRes) Headers() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res rulesPageRes) Empty() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type changeRuleStatusRes struct {
|
||||
re.Rule `json:",inline"`
|
||||
}
|
||||
|
||||
func (res changeRuleStatusRes) Code() int {
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func (res changeRuleStatusRes) Headers() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res changeRuleStatusRes) Empty() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type changeRoleStatusRes struct {
|
||||
deleted bool
|
||||
}
|
||||
|
||||
func (res changeRoleStatusRes) Code() int {
|
||||
if res.deleted {
|
||||
return http.StatusNoContent
|
||||
}
|
||||
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func (res changeRoleStatusRes) Headers() map[string]string {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
func (res changeRoleStatusRes) Empty() bool {
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/internal/api"
|
||||
"github.com/absmach/magistrala/invitations"
|
||||
"github.com/absmach/magistrala/pkg/apiutil"
|
||||
mgauthn "github.com/absmach/magistrala/pkg/authn"
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
"github.com/absmach/magistrala/re"
|
||||
"github.com/go-chi/chi"
|
||||
kithttp "github.com/go-kit/kit/transport/http"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
)
|
||||
|
||||
const (
|
||||
idKey = "ruleID"
|
||||
inputChannelKey = "input_channel"
|
||||
outputChannelKey = "output_channel"
|
||||
statusKey = "status"
|
||||
)
|
||||
|
||||
// MakeHandler creates an HTTP handler for the service endpoints.
|
||||
func MakeHandler(svc re.Service, authn mgauthn.Authentication, logger *slog.Logger, instanceID string) http.Handler {
|
||||
opts := []kithttp.ServerOption{
|
||||
kithttp.ServerErrorEncoder(apiutil.LoggingErrorEncoder(logger, api.EncodeError)),
|
||||
}
|
||||
mux := chi.NewRouter()
|
||||
mux.Group(func(r chi.Router) {
|
||||
r.Use(api.AuthenticateMiddleware(authn, true))
|
||||
r.Route("/{domainID}/rules", func(r chi.Router) {
|
||||
r.Post("/", otelhttp.NewHandler(kithttp.NewServer(
|
||||
addRuleEndpoint(svc),
|
||||
decodeAddRuleRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "create_rule").ServeHTTP)
|
||||
|
||||
r.Get("/{ruleID}", otelhttp.NewHandler(kithttp.NewServer(
|
||||
viewRuleEndpoint(svc),
|
||||
decodeViewRuleRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "view_rule").ServeHTTP)
|
||||
|
||||
r.Get("/", otelhttp.NewHandler(kithttp.NewServer(
|
||||
listRulesEndpoint(svc),
|
||||
decodeListRulesRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "list_rules").ServeHTTP)
|
||||
|
||||
r.Put("/{ruleID}", otelhttp.NewHandler(kithttp.NewServer(
|
||||
updateRuleEndpoint(svc),
|
||||
decodeUpdateRuleRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "update_rule").ServeHTTP)
|
||||
|
||||
r.Put("/{ruleID}/status", otelhttp.NewHandler(kithttp.NewServer(
|
||||
upadateRuleStatusEndpoint(svc),
|
||||
decodeUpdateRuleStatusRequest,
|
||||
api.EncodeResponse,
|
||||
opts...,
|
||||
), "update_rule_status").ServeHTTP)
|
||||
})
|
||||
})
|
||||
|
||||
mux.Get("/health", magistrala.Health("rule_engine", instanceID))
|
||||
mux.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
func decodeAddRuleRequest(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
if !strings.Contains(r.Header.Get("Content-Type"), api.ContentType) {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, apiutil.ErrUnsupportedContentType)
|
||||
}
|
||||
var rule re.Rule
|
||||
if err := json.NewDecoder(r.Body).Decode(&rule); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return addRuleReq{Rule: rule}, nil
|
||||
}
|
||||
|
||||
func decodeViewRuleRequest(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
id := chi.URLParam(r, idKey)
|
||||
return viewRuleReq{id: id}, nil
|
||||
}
|
||||
|
||||
func decodeUpdateRuleRequest(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
var rule re.Rule
|
||||
if err := json.NewDecoder(r.Body).Decode(&rule); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return updateRuleReq{Rule: rule}, nil
|
||||
}
|
||||
|
||||
func decodeListRulesRequest(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
offset, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
limit, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
ic, err := apiutil.ReadStringQuery(r, inputChannelKey, "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
oc, err := apiutil.ReadStringQuery(r, outputChannelKey, "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
return listRulesReq{
|
||||
PageMeta: re.PageMeta{
|
||||
Offset: offset,
|
||||
Limit: limit,
|
||||
InputChannel: ic,
|
||||
OutputChannel: oc,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func decodeUpdateRuleStatusRequest(_ context.Context, r *http.Request) (interface{}, error) {
|
||||
id := r.URL.Query().Get(idKey)
|
||||
status, err := apiutil.ReadStringQuery(r, statusKey, invitations.All.String())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
s, err := re.ToStatus(status)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(apiutil.ErrValidation, err)
|
||||
}
|
||||
return changeRuleStatusReq{id: id, status: s}, nil
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package re contain the domain concept definitions needed to
|
||||
// support Magistrala Rule Egine services functionality.
|
||||
package re
|
||||
@@ -0,0 +1,45 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
_ "github.com/jackc/pgx/v5/stdlib" // required for SQL access
|
||||
migrate "github.com/rubenv/sql-migrate"
|
||||
)
|
||||
|
||||
func Migration() *migrate.MemoryMigrationSource {
|
||||
return &migrate.MemoryMigrationSource{
|
||||
Migrations: []*migrate.Migration{
|
||||
{
|
||||
Id: "rules_01",
|
||||
// VARCHAR(36) for colums with IDs as UUIDS have a maximum of 36 characters
|
||||
// STATUS 0 to imply enabled and 1 to imply disabled
|
||||
Up: []string{
|
||||
`CREATE TABLE IF NOT EXISTS rules (
|
||||
id VARCHAR(36) PRIMARY KEY,
|
||||
name VARCHAR(1024),
|
||||
domain_id VARCHAR(36) NOT NULL,
|
||||
metadata JSONB,
|
||||
created_at TIMESTAMP,
|
||||
updated_at TIMESTAMP,
|
||||
updated_by VARCHAR(254),
|
||||
input_channel VARCHAR(36),
|
||||
input_topic TEXT,
|
||||
output_channel VARCHAR(36),
|
||||
output_topic TEXT,
|
||||
status SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0),
|
||||
logic_type SMALLINT NOT NULL DEFAULT 0 CHECK (status >= 0),
|
||||
logic_value BYTEA,
|
||||
recurring_time TIMESTAMP[],
|
||||
recurring_type SMALLINT,
|
||||
recurring_period SMALLINT
|
||||
)`,
|
||||
},
|
||||
Down: []string{
|
||||
`DROP TABLE IF EXISTS rules`,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,174 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/absmach/magistrala/pkg/errors"
|
||||
repoerr "github.com/absmach/magistrala/pkg/errors/repository"
|
||||
"github.com/absmach/magistrala/pkg/postgres"
|
||||
"github.com/absmach/magistrala/re"
|
||||
)
|
||||
|
||||
// SQL Queries as Strings
|
||||
const (
|
||||
addRuleQuery = `
|
||||
INSERT INTO rules (id, domain_id, input_channel, input_topic, logic_type, logic_value,
|
||||
output_channel, output_topic, recurring_time, recurring_type, recurring_period, status)
|
||||
VALUES (:id, :domain_id, :input_channel, :input_topic, :logic_type, :logic_value,
|
||||
:output_channel, :output_topic, :recurring_time, :recurring_type, :recurring_period, :status)
|
||||
RETURNING id;
|
||||
`
|
||||
|
||||
viewRuleQuery = `
|
||||
SELECT id, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
|
||||
output_topic, recurring_time, recurring_type, recurring_period, status
|
||||
FROM rules
|
||||
WHERE id = $1;
|
||||
`
|
||||
|
||||
updateRuleQuery = `
|
||||
UPDATE rules
|
||||
SET input_channel = :input_channel, input_topic = :input_topic, logic_type = :logic_type,
|
||||
logic_value = :logic_value, output_channel = :output_channel, output_topic = :output_topic,
|
||||
recurring_time = :recurring_time, recurring_type = :recurring_type,
|
||||
recurring_period = :recurring_period, status = :status
|
||||
WHERE id = :id;
|
||||
`
|
||||
|
||||
removeRuleQuery = `
|
||||
DELETE FROM rules
|
||||
WHERE id = $1;
|
||||
`
|
||||
|
||||
listRulesQuery = `
|
||||
SELECT id, domain_id, input_channel, input_topic, logic_type, logic_value, output_channel,
|
||||
output_topic, recurring_time, recurring_type, recurring_period, status
|
||||
FROM rules r %s %s;
|
||||
`
|
||||
|
||||
totalQuery = `SELECT COUNT(*) FROM rules r %s;`
|
||||
)
|
||||
|
||||
type PostgresRepository struct {
|
||||
DB postgres.Database
|
||||
}
|
||||
|
||||
func NewRepository(db postgres.Database) re.Repository {
|
||||
return &PostgresRepository{DB: db}
|
||||
}
|
||||
|
||||
func (repo *PostgresRepository) AddRule(ctx context.Context, r re.Rule) (re.Rule, error) {
|
||||
dbr := ruleToDb(r)
|
||||
_, err := repo.DB.NamedExecContext(ctx, addRuleQuery, dbr)
|
||||
if err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (repo *PostgresRepository) ViewRule(ctx context.Context, id string) (re.Rule, error) {
|
||||
row := repo.DB.QueryRowxContext(ctx, viewRuleQuery, id)
|
||||
if err := row.Err(); err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
var dbr dbRule
|
||||
err := row.StructScan(&dbr)
|
||||
if err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
ret := dbToRule(dbr)
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func (repo *PostgresRepository) UpdateRule(ctx context.Context, r re.Rule) (re.Rule, error) {
|
||||
dbr := ruleToDb(r)
|
||||
result, err := repo.DB.NamedExecContext(ctx, updateRuleQuery, dbr)
|
||||
if err != nil {
|
||||
return re.Rule{}, err
|
||||
}
|
||||
|
||||
if _, err := result.RowsAffected(); err != nil {
|
||||
return re.Rule{}, repoerr.ErrNotFound
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (repo *PostgresRepository) RemoveRule(ctx context.Context, id string) error {
|
||||
result, err := repo.DB.ExecContext(ctx, removeRuleQuery, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := result.RowsAffected(); err != nil {
|
||||
return repoerr.ErrNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (repo *PostgresRepository) ListRules(ctx context.Context, pm re.PageMeta) (re.Page, error) {
|
||||
pgData := ""
|
||||
if pm.Limit != 0 {
|
||||
pgData = "LIMIT :limit"
|
||||
}
|
||||
if pm.Offset != 0 {
|
||||
pgData += " OFFEST :offset"
|
||||
}
|
||||
pq := pageQuery(pm)
|
||||
q := fmt.Sprintf(listRulesQuery, pq, pgData)
|
||||
rows, err := repo.DB.NamedQueryContext(ctx, q, pm)
|
||||
if err != nil {
|
||||
return re.Page{}, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var rules []re.Rule
|
||||
var r dbRule
|
||||
for rows.Next() {
|
||||
if err := rows.StructScan(&r); err != nil {
|
||||
return re.Page{}, errors.Wrap(repoerr.ErrViewEntity, err)
|
||||
}
|
||||
rules = append(rules, dbToRule(r))
|
||||
}
|
||||
|
||||
cq := fmt.Sprintf(totalQuery, pq)
|
||||
|
||||
total, err := postgres.Total(ctx, repo.DB, cq, pm)
|
||||
if err != nil {
|
||||
return re.Page{}, errors.Wrap(repoerr.ErrViewEntity, err)
|
||||
}
|
||||
pm.Total = total
|
||||
ret := re.Page{
|
||||
PageMeta: pm,
|
||||
Rules: rules,
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func pageQuery(pm re.PageMeta) string {
|
||||
var query []string
|
||||
if pm.InputChannel != "" {
|
||||
query = append(query, "r.input_channel = :input_channel")
|
||||
}
|
||||
if pm.OutputChannel != "" {
|
||||
query = append(query, "r.output_channel = :output_channel")
|
||||
}
|
||||
if pm.Status != re.AllStatus {
|
||||
query = append(query, "r.status = :status")
|
||||
}
|
||||
|
||||
var q string
|
||||
if len(query) > 0 {
|
||||
q = fmt.Sprintf("WHERE %s", strings.Join(query, " AND "))
|
||||
}
|
||||
|
||||
return q
|
||||
}
|
||||
@@ -0,0 +1,118 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala/re"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
// dbRule represents the database structure for a Rule.
|
||||
type dbRule struct {
|
||||
ID string `db:"id"`
|
||||
DomainID string `db:"domain_id"`
|
||||
InputChannel string `db:"input_channel"`
|
||||
InputTopic sql.NullString `db:"input_topic"`
|
||||
LogicType re.ScriptType `db:"logic_type"`
|
||||
LogicValue string `db:"logic_value"`
|
||||
OutputChannel sql.NullString `db:"output_channel"`
|
||||
OutputTopic sql.NullString `db:"output_topic"`
|
||||
RecurringTime *pgtype.Array[string] `db:"recurring_time"`
|
||||
RecurringType re.ReccuringType `db:"recurring_type"`
|
||||
RecurringPeriod uint `db:"recurring_period"`
|
||||
Status re.Status `db:"status"`
|
||||
CreatedAt time.Time `db:"created_at"`
|
||||
CreatedBy string `db:"created_by"`
|
||||
UpdatedAt time.Time `db:"updated_at"`
|
||||
UpdatedBy string `db:"updated_by"`
|
||||
}
|
||||
|
||||
func ruleToDb(r re.Rule) dbRule {
|
||||
return dbRule{
|
||||
ID: r.ID,
|
||||
DomainID: r.DomainID,
|
||||
InputChannel: r.InputChannel,
|
||||
InputTopic: toNullString(r.InputTopic),
|
||||
LogicType: r.Logic.Type,
|
||||
LogicValue: r.Logic.Value,
|
||||
OutputChannel: toNullString(r.OutputChannel),
|
||||
OutputTopic: toNullString(r.OutputTopic),
|
||||
RecurringTime: toStringArray(r.Schedule.Time),
|
||||
RecurringType: r.Schedule.RecurringType,
|
||||
RecurringPeriod: r.Schedule.RecurringPeriod,
|
||||
Status: r.Status,
|
||||
CreatedAt: r.CreatedAt,
|
||||
CreatedBy: r.CreatedBy,
|
||||
UpdatedAt: r.UpdatedAt,
|
||||
UpdatedBy: r.UpdatedBy,
|
||||
}
|
||||
}
|
||||
|
||||
func dbToRule(dto dbRule) re.Rule {
|
||||
return re.Rule{
|
||||
ID: dto.ID,
|
||||
DomainID: dto.DomainID,
|
||||
InputChannel: dto.InputChannel,
|
||||
InputTopic: fromNullString(dto.InputTopic),
|
||||
Logic: re.Script{
|
||||
Type: dto.LogicType,
|
||||
Value: dto.LogicValue,
|
||||
},
|
||||
OutputChannel: fromNullString(dto.OutputChannel),
|
||||
OutputTopic: fromNullString(dto.OutputTopic),
|
||||
Schedule: re.Schedule{
|
||||
Time: toTimeSlice(dto.RecurringTime),
|
||||
RecurringType: dto.RecurringType,
|
||||
RecurringPeriod: dto.RecurringPeriod,
|
||||
},
|
||||
Status: re.Status(dto.Status),
|
||||
CreatedAt: dto.CreatedAt,
|
||||
CreatedBy: dto.CreatedBy,
|
||||
UpdatedAt: dto.UpdatedAt,
|
||||
UpdatedBy: dto.UpdatedBy,
|
||||
}
|
||||
}
|
||||
|
||||
func toNullString(value string) sql.NullString {
|
||||
if value == "" {
|
||||
return sql.NullString{Valid: false}
|
||||
}
|
||||
return sql.NullString{String: value, Valid: true}
|
||||
}
|
||||
|
||||
func fromNullString(nullString sql.NullString) string {
|
||||
if !nullString.Valid {
|
||||
return ""
|
||||
}
|
||||
return nullString.String
|
||||
}
|
||||
|
||||
func toStringArray(times []time.Time) *pgtype.Array[string] {
|
||||
var strArray []string
|
||||
for _, t := range times {
|
||||
strArray = append(strArray, t.Format(time.RFC3339))
|
||||
}
|
||||
ret := pgtype.Array[string]{
|
||||
Elements: strArray,
|
||||
Valid: true,
|
||||
}
|
||||
return &ret
|
||||
}
|
||||
|
||||
func toTimeSlice(strArray *pgtype.Array[string]) []time.Time {
|
||||
if strArray == nil || !strArray.Valid {
|
||||
return []time.Time{}
|
||||
}
|
||||
var times []time.Time
|
||||
for _, s := range strArray.Elements {
|
||||
t, err := time.Parse(time.RFC3339, s)
|
||||
if err == nil {
|
||||
times = append(times, t)
|
||||
}
|
||||
}
|
||||
return times
|
||||
}
|
||||
+201
@@ -0,0 +1,201 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package re
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/absmach/magistrala"
|
||||
"github.com/absmach/magistrala/consumers"
|
||||
"github.com/absmach/magistrala/pkg/authn"
|
||||
"github.com/absmach/magistrala/pkg/messaging"
|
||||
mgjson "github.com/absmach/magistrala/pkg/transformers/json"
|
||||
lua "github.com/yuin/gopher-lua"
|
||||
)
|
||||
|
||||
type ScriptType uint
|
||||
|
||||
type Script struct {
|
||||
Type ScriptType `json:"type"`
|
||||
Value string `json:"value"`
|
||||
}
|
||||
|
||||
// daily, weekly or monthly
|
||||
type ReccuringType uint
|
||||
|
||||
const (
|
||||
None ReccuringType = iota
|
||||
Daily
|
||||
Weekly
|
||||
Monthly
|
||||
)
|
||||
|
||||
type Schedule struct {
|
||||
Time []time.Time `json:"date,omitempty"`
|
||||
RecurringType ReccuringType
|
||||
RecurringPeriod uint // 1 meaning every Recurring value, 2 meaning every other, and so on.
|
||||
}
|
||||
|
||||
type Rule struct {
|
||||
ID string `json:"id"`
|
||||
DomainID string `json:"domain"`
|
||||
InputChannel string `json:"input_channel"`
|
||||
InputTopic string `json:"input_topic"`
|
||||
Logic Script `json:"logic"`
|
||||
OutputChannel string `json:"output_channel,omitempty"`
|
||||
OutputTopic string `json:"output_topic,omitempty"`
|
||||
Schedule Schedule `json:"schedule,omitempty"`
|
||||
Status Status `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at,omitempty"`
|
||||
CreatedBy string `json:"created_by,omitempty"`
|
||||
UpdatedAt time.Time `json:"updated_at,omitempty"`
|
||||
UpdatedBy string `json:"updated_by,omitempty"`
|
||||
}
|
||||
|
||||
type Repository interface {
|
||||
AddRule(ctx context.Context, r Rule) (Rule, error)
|
||||
ViewRule(ctx context.Context, id string) (Rule, error)
|
||||
UpdateRule(ctx context.Context, r Rule) (Rule, error)
|
||||
RemoveRule(ctx context.Context, id string) error
|
||||
ListRules(ctx context.Context, pm PageMeta) (Page, error)
|
||||
}
|
||||
|
||||
// PageMeta contains page metadata that helps navigation.
|
||||
type PageMeta struct {
|
||||
Total uint64 `json:"total" db:"total"`
|
||||
Offset uint64 `json:"offset" db:"offset"`
|
||||
Limit uint64 `json:"limit" db:"limit"`
|
||||
Dir string `json:"dir" db:"dir"`
|
||||
Name string `json:"name" db:"name"`
|
||||
InputChannel string `json:"input_channel,omitempty" db:"input_channel"`
|
||||
OutputChannel string `json:"output_channel,omitempty" db:"output_channel"`
|
||||
Status Status `json:"status,omitempty" db:"status"`
|
||||
}
|
||||
|
||||
type Page struct {
|
||||
PageMeta
|
||||
Rules []Rule `json:"rules"`
|
||||
}
|
||||
|
||||
type Service interface {
|
||||
consumers.AsyncConsumer
|
||||
AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
|
||||
ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error)
|
||||
UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error)
|
||||
ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error)
|
||||
RemoveRule(ctx context.Context, session authn.Session, id string) error
|
||||
}
|
||||
|
||||
type re struct {
|
||||
idp magistrala.IDProvider
|
||||
repo Repository
|
||||
pubSub messaging.PubSub
|
||||
errors chan error
|
||||
}
|
||||
|
||||
func NewService(repo Repository, idp magistrala.IDProvider, pubSub messaging.PubSub) Service {
|
||||
return &re{
|
||||
repo: repo,
|
||||
idp: idp,
|
||||
pubSub: pubSub,
|
||||
errors: make(chan error),
|
||||
}
|
||||
}
|
||||
|
||||
func (re *re) AddRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
|
||||
id, err := re.idp.ID()
|
||||
if err != nil {
|
||||
return Rule{}, err
|
||||
}
|
||||
r.CreatedAt = time.Now()
|
||||
r.ID = id
|
||||
r.CreatedBy = session.UserID
|
||||
r.DomainID = session.DomainID
|
||||
r.Status = EnabledStatus
|
||||
return re.repo.AddRule(ctx, r)
|
||||
}
|
||||
|
||||
func (re *re) ViewRule(ctx context.Context, session authn.Session, id string) (Rule, error) {
|
||||
return re.repo.ViewRule(ctx, id)
|
||||
}
|
||||
|
||||
func (re *re) UpdateRule(ctx context.Context, session authn.Session, r Rule) (Rule, error) {
|
||||
return re.repo.UpdateRule(ctx, r)
|
||||
}
|
||||
|
||||
func (re *re) ListRules(ctx context.Context, session authn.Session, pm PageMeta) (Page, error) {
|
||||
return re.repo.ListRules(ctx, pm)
|
||||
}
|
||||
|
||||
func (re *re) RemoveRule(ctx context.Context, session authn.Session, id string) error {
|
||||
return re.repo.RemoveRule(ctx, id)
|
||||
}
|
||||
|
||||
func (re *re) ConsumeAsync(ctx context.Context, msgs interface{}) {
|
||||
switch m := msgs.(type) {
|
||||
case *messaging.Message:
|
||||
pm := PageMeta{
|
||||
InputChannel: m.Channel,
|
||||
Status: EnabledStatus,
|
||||
}
|
||||
page, err := re.repo.ListRules(ctx, pm)
|
||||
if err != nil {
|
||||
re.errors <- err
|
||||
return
|
||||
}
|
||||
for _, r := range page.Rules {
|
||||
go re.process(r, m)
|
||||
}
|
||||
case mgjson.Message:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (re *re) Errors() <-chan error {
|
||||
return re.errors
|
||||
}
|
||||
|
||||
func (re *re) process(r Rule, msg *messaging.Message) error {
|
||||
l := lua.NewState()
|
||||
defer l.Close()
|
||||
|
||||
message := l.NewTable()
|
||||
|
||||
l.RawSet(message, lua.LString("channel"), lua.LString(msg.Channel))
|
||||
l.RawSet(message, lua.LString("subtopic"), lua.LString(msg.Subtopic))
|
||||
l.RawSet(message, lua.LString("publisher"), lua.LString(msg.Publisher))
|
||||
l.RawSet(message, lua.LString("protocol"), lua.LString(msg.Protocol))
|
||||
l.RawSet(message, lua.LString("created"), lua.LNumber(msg.Created))
|
||||
|
||||
pld := l.NewTable()
|
||||
for i, b := range msg.Payload {
|
||||
l.RawSet(pld, lua.LNumber(i+1), lua.LNumber(b)) // Lua tables are 1-indexed
|
||||
}
|
||||
l.RawSet(message, lua.LString("payload"), pld)
|
||||
|
||||
// Set the message object as a Lua global variable.
|
||||
l.SetGlobal("message", message)
|
||||
|
||||
if err := l.DoString(string(r.Logic.Value)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
result := l.Get(-1) // Get the last result
|
||||
switch result {
|
||||
case lua.LNil:
|
||||
return nil
|
||||
default:
|
||||
if len(r.OutputChannel) == 0 {
|
||||
return nil
|
||||
}
|
||||
m := &messaging.Message{
|
||||
Publisher: "magistrala.re",
|
||||
Created: time.Now().Unix(),
|
||||
Payload: []byte(result.String()),
|
||||
}
|
||||
re.pubSub.Publish(context.Background(), m.Channel, m)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
// Copyright (c) Abstract Machines
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package re
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
svcerr "github.com/absmach/magistrala/pkg/errors/service"
|
||||
)
|
||||
|
||||
// Status represents Rule status.
|
||||
type Status uint8
|
||||
|
||||
// Possible User status values.
|
||||
const (
|
||||
// EnabledStatus represents enabled Rule.
|
||||
EnabledStatus Status = iota
|
||||
// DisabledStatus represents disabled Rule.
|
||||
DisabledStatus
|
||||
// DeletedStatus represents a rule that will be deleted.
|
||||
DeletedStatus
|
||||
|
||||
// AllStatus is used for querying purposes to list rules irrespective
|
||||
// of their status - both enabled and disabled. It is never stored in the
|
||||
// database as the actual User status and should always be the largest
|
||||
// value in this enumeration.
|
||||
AllStatus
|
||||
)
|
||||
|
||||
// String representation of the possible status values.
|
||||
const (
|
||||
Disabled = "disabled"
|
||||
Enabled = "enabled"
|
||||
Deleted = "deleted"
|
||||
All = "all"
|
||||
Unknown = "unknown"
|
||||
)
|
||||
|
||||
func (s Status) String() string {
|
||||
switch s {
|
||||
case DisabledStatus:
|
||||
return Disabled
|
||||
case EnabledStatus:
|
||||
return Enabled
|
||||
case DeletedStatus:
|
||||
return Deleted
|
||||
case AllStatus:
|
||||
return All
|
||||
default:
|
||||
return Unknown
|
||||
}
|
||||
}
|
||||
|
||||
// ToStatus converts string value to a valid status.
|
||||
func ToStatus(status string) (Status, error) {
|
||||
switch status {
|
||||
case "", Enabled:
|
||||
return EnabledStatus, nil
|
||||
case Disabled:
|
||||
return DisabledStatus, nil
|
||||
case Deleted:
|
||||
return DeletedStatus, nil
|
||||
case All:
|
||||
return AllStatus, nil
|
||||
}
|
||||
return Status(0), svcerr.ErrInvalidStatus
|
||||
}
|
||||
|
||||
func (s Status) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(s.String())
|
||||
}
|
||||
|
||||
func (s *Status) UnmarshalJSON(data []byte) error {
|
||||
str := strings.Trim(string(data), "\"")
|
||||
val, err := ToStatus(str)
|
||||
*s = val
|
||||
return err
|
||||
}
|
||||
Reference in New Issue
Block a user