Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
Magistrala Rules Engine
The Magistrala Rules Engine (RE) is a service that enables real-time message processing and transformation through user-defined rules. It allows you to create rules that process incoming messages using Lua scripts and publish the results to output channels.
Architecture
The Rules Engine operates by:
- Listening for messages on configured input channels
- Processing these messages through Lua scripts
- Optionally publishing results to output channels
- Supporting scheduled rule execution based on various recurring patterns
Core Concepts
Rules
A rule consists of:
id- Unique identifiername- Human-readable namedomain- Domain the rule belongs toinput_channel- Channel to listen for incoming messagesinput_topic- Specific topic within the input channellogic- Lua script that processes the messageoutput_channel- (Optional) Channel to publish results tooutput_topic- (Optional) Topic within the output channelschedule- (Optional) Scheduling configurationstatus- Rule state (enabled/disabled/deleted)metadata- Additional rule metadata
A rule can be in one of these states:
enabled- Rule is active and processing messagesdisabled- Rule is inactive and won't process messagesdeleted- Rule is marked for deletion
Message Processing
When a message arrives on a rule's input channel, the Rules Engine:
- Creates a Lua environment
- Injects the message as a global variable with the following structure:
message = { channel = "channel_name", subtopic = "subtopic_name", publisher = "publisher_id", protocol = "protocol_name", created = timestamp, payload = [byte_array] } - Executes the rule's Lua script
- If the script returns a non-nil value and an output channel is configured, publishes the result
Scheduling
Rules can be scheduled to run at specific times with various recurring patterns. The scheduler works through several key components:
Schedule Structure
type Schedule struct {
StartDateTime time.Time // When the schedule becomes active
Time time.Time // Specific time for the rule to run
Recurring Recurring // None, Daily, Weekly, Monthly
RecurringPeriod uint // Interval between executions: 1 = every interval, 2 = every second interval, etc.
}
How Scheduling Works
-
Initialization:
- The scheduler starts when the service begins running via
StartScheduler() - It uses a ticker to check for rules that need to be executed at regular intervals
- The scheduler starts when the service begins running via
-
Rule Evaluation:
- For each tick, the scheduler:
- Gets all enabled rules scheduled before the current time
- For each rule, checks if it should run using
shouldRunRule() - If a rule should run, processes it asynchronously
- For each tick, the scheduler:
-
Execution Timing: The
shouldRunRule()function determines if a rule should run by checking:- If the rule's start time has been reached
- If the current time matches the scheduled execution time
- For recurring rules:
- Daily: Checks if the correct number of days have passed since start
- Weekly: Checks if the correct number of weeks have passed since start
- Monthly: Checks if the correct number of months have passed since start
-
Recurring Patterns:
None: Rule runs once at the specified timeDaily: Rule runs every N days where N is the RecurringPeriodWeekly: Rule runs every N weeksMonthly: Rule runs every N months
For example, to run a rule:
- Every day at 9 AM: Set recurring to "daily" with recurring_period = 1
- Every other week: Set recurring to "weekly" with recurring_period = 2
- Monthly on the 1st: Set recurring to "monthly" with recurring_period = 1
API Operations
The Rules Engine service provides the following operations:
AddRule- Create a new ruleViewRule- Retrieve a specific ruleUpdateRule- Modify an existing ruleListRules- Query rules with filtering optionsRemoveRule- Delete a ruleEnableRule- Activate a ruleDisableRule- Deactivate a rule
Using the API
Adding a Rule
You can create a new rule using the Rules Engine API. Here's an example using curl:
curl --location 'http://localhost:9008/8353542f-d8f1-4dce-b787-4af3712f117e/rules' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer <access_token>' \
--data '{
"name": "High Temperature Alert",
"input_channel": "sensors",
"input_topic": "temperature",
"logic": {
"type": 0,
"value": "if message.payload > 30 then return '\''Temperature too high!'\'' end"
},
"output_channel": "alerts",
"output_topic": "temperature",
"schedule": {
"start_datetime": "2024-01-01T00:00",
"time": "2024-01-01T09:00",
"recurring": "daily",
"recurring_period": 1
}
}'
This request:
- Creates a temperature monitoring rule
- Processes messages from the "sensors" channel
- Checks for temperatures above 30 degrees
- Publishes alerts to the "alerts" channel
- Runs daily at 9 AM
The API endpoint follows the format: http://localhost:9008/{domain_id}/rules
Required headers:
Content-Type: application/json- Specifies the request body formatAuthorization: Bearer <access_token>- Your authentication token
Example Rule Structure
Here's a breakdown of the rule structure:
{
"name": "High Temperature Alert",
"input_channel": "sensors",
"input_topic": "temperature",
"logic": {
"type": 0,
"value": "if message.payload > 30 then return 'Temperature too high!' end"
},
"output_channel": "alerts",
"output_topic": "temperature",
"schedule": {
"start_datetime": "2024-01-01T00:00",
"time": "2024-01-01T09:00",
"recurring": "daily",
"recurring_period": 1
}
}
This rule:
- Listens on the "sensors" channel, "temperature" topic
- Checks if temperature exceeds 30 degrees
- If true, publishes an alert message
- Runs daily at 9 AM
Running the Service
To start the Rules Engine service, run:
make run_addons re
This command starts the Rules Engine service using Docker Compose with the configuration defined in docker-compose.yaml.