Signed-off-by: dusan <borovcanindusan1@gmail.com>
12 KiB
Writers
Writers consume messages from the message broker, normalize them (SenML or JSON), and persist them to a storage backend. Magistrala provides two writer services:
- Postgres writer: Stores data in PostgreSQL.
- Timescale writer: Stores data in TimescaleDB and uses hypertables for time-series workloads.
Writers are optional services and are treated as plugins. Core services and the message broker must be running first. For platform dependencies, see Docker Compose.
Configuration
Values shown are from docker/.env and the add-on compose files in docker/addons/*-writer/docker-compose.yaml.
Postgres writer
Postgres Service endpoints
| Variable | Description | Default |
|---|---|---|
MG_POSTGRES_WRITER_LOG_LEVEL |
Service log level | debug |
MG_POSTGRES_WRITER_CONFIG_PATH |
Config file path (topics/transformer) | /config.toml |
MG_POSTGRES_WRITER_HTTP_HOST |
HTTP host | postgres-writer |
MG_POSTGRES_WRITER_HTTP_PORT |
HTTP port | 9007 |
MG_POSTGRES_WRITER_HTTP_SERVER_CERT |
HTTPS server certificate path | "" |
MG_POSTGRES_WRITER_HTTP_SERVER_KEY |
HTTPS server key path | "" |
MG_POSTGRES_WRITER_INSTANCE_ID |
Instance ID | "" |
Postgres Database
| Variable | Description | Default |
|---|---|---|
MG_POSTGRES_HOST |
PostgreSQL host | postgres |
MG_POSTGRES_PORT |
PostgreSQL port | 5432 |
MG_POSTGRES_USER |
PostgreSQL user | magistrala |
MG_POSTGRES_PASS |
PostgreSQL password | magistrala |
MG_POSTGRES_NAME |
PostgreSQL database name | messages |
MG_POSTGRES_SSL_MODE |
PostgreSQL SSL mode | disable |
MG_POSTGRES_SSL_CERT |
PostgreSQL SSL client cert | "" |
MG_POSTGRES_SSL_KEY |
PostgreSQL SSL client key | "" |
MG_POSTGRES_SSL_ROOT_CERT |
PostgreSQL SSL root cert | "" |
Postgres Message broker and observability
| Variable | Description | Default |
|---|---|---|
MG_MESSAGE_BROKER_URL |
Message broker URL | nats://nats:4222 |
MG_JAEGER_URL |
Jaeger collector endpoint | http://jaeger:4318/v1/traces |
MG_JAEGER_TRACE_RATIO |
Trace sampling ratio | 1.0 |
MG_SEND_TELEMETRY |
Send telemetry to Magistrala call-home server | true |
Timescale writer
Timescale Service endpoints
| Variable | Description | Default |
|---|---|---|
MG_TIMESCALE_WRITER_LOG_LEVEL |
Service log level | debug |
MG_TIMESCALE_WRITER_CONFIG_PATH |
Config file path (topics/transformer) | /config.toml |
MG_TIMESCALE_WRITER_HTTP_HOST |
HTTP host | timescale-writer |
MG_TIMESCALE_WRITER_HTTP_PORT |
HTTP port | 9012 |
MG_TIMESCALE_WRITER_HTTP_SERVER_CERT |
HTTPS server certificate path | "" |
MG_TIMESCALE_WRITER_HTTP_SERVER_KEY |
HTTPS server key path | "" |
MG_TIMESCALE_WRITER_INSTANCE_ID |
Instance ID | "" |
Timescale Database
| Variable | Description | Default |
|---|---|---|
MG_TIMESCALE_HOST |
TimescaleDB host | timescale |
MG_TIMESCALE_PORT |
TimescaleDB port | 5432 |
MG_TIMESCALE_USER |
TimescaleDB user | magistrala |
MG_TIMESCALE_PASS |
TimescaleDB password | magistrala |
MG_TIMESCALE_NAME |
TimescaleDB database name | magistrala |
MG_TIMESCALE_SSL_MODE |
TimescaleDB SSL mode | disable |
MG_TIMESCALE_SSL_CERT |
TimescaleDB SSL client cert | "" |
MG_TIMESCALE_SSL_KEY |
TimescaleDB SSL client key | "" |
MG_TIMESCALE_SSL_ROOT_CERT |
TimescaleDB SSL root cert | "" |
Timescale Message broker and observability
Timescale writer uses the same broker and telemetry variables listed for Postgres writer.
Writer config file
Both writers read a config file defined by *_WRITER_CONFIG_PATH. The default add-on config files are:
docker/addons/postgres-writer/config.tomldocker/addons/timescale-writer/config.toml
The config file controls subscription topics and optional transformer settings for both writers. The default Timescale add-on config omits the transformer section and relies on the built-in defaults:
["subscriber"]
topics = ["writers/#"]
[transformer]
format = "senml"
content_type = "application/senml+json"
time_fields = [
{ field_name = "seconds_key", field_format = "unix", location = "UTC" },
{ field_name = "millis_key", field_format = "unix_ms", location = "UTC" },
{ field_name = "micros_key", field_format = "unix_us", location = "UTC" },
{ field_name = "nanos_key", field_format = "unix_ns", location = "UTC" }
]
The topic filter uses slash-delimited MQTT-style syntax (+, #) in the config file for both backends. Writers do not expose broker mode, delivery policy, or consumer-group settings in this file. They always consume through the stream-backed broker adapter in consumers/writers/brokers:
- NATS builds use JetStream streams with durable consumers.
- FluxMQ builds publish to and consume from the
writersstream queue while preserving the samewriters/#config syntax.
Features
- Message persistence: Stores incoming SenML messages into PostgreSQL or TimescaleDB.
- JSON payload support: Saves JSON payloads into dynamically created tables.
- Stream-backed ingestion: Consumes through NATS JetStream durable consumers or FluxMQ stream queues.
- Configurable subscription: Limits ingestion to specific
writers/<channel>/<subtopic>topics. - Observability: Exposes
/healthand/metricsendpoints, with Jaeger tracing.
Architecture
Runtime flow
- The rules engine publishes writer messages under
writers/<channel>/<subtopic>. - The writer loads
config.tomlto select topic filters and transformer settings. - The broker adapter consumes from the underlying stream-backed implementation.
- The consumer converts messages to SenML or JSON payloads.
- The repository writes records to the target database.
Components
- Message broker adapter:
consumers/writers/brokers(NATS JetStream or FluxMQ stream queues). - Writer services:
consumers/writers/postgresandconsumers/writers/timescale. - HTTP API:
consumers/writers/apiexposes/healthand/metrics. - Migrations:
consumers/writers/*/init.godefines the schema and indexes.
PostgreSQL schema (SenML messages)
Defined in consumers/writers/postgres/init.go:
| Column | Type | Description |
|---|---|---|
id |
UUID |
Message ID |
channel |
UUID |
Channel ID |
subtopic |
VARCHAR(254) |
Subtopic |
publisher |
UUID |
Publisher ID |
protocol |
TEXT |
Protocol name |
name |
TEXT |
SenML name |
unit |
TEXT |
SenML unit |
value |
FLOAT |
Numeric value |
string_value |
TEXT |
String value |
bool_value |
BOOL |
Boolean value |
data_value |
BYTEA |
Data value |
sum |
FLOAT |
Sum value |
time |
FLOAT |
Measurement time |
update_time |
FLOAT |
Update time |
Primary key: (time, publisher, subtopic, name)
TimescaleDB schema (SenML messages)
Defined in consumers/writers/timescale/init.go:
| Column | Type | Description |
|---|---|---|
time |
BIGINT |
Measurement time |
channel |
UUID |
Channel ID |
subtopic |
VARCHAR(254) |
Subtopic |
publisher |
VARCHAR(254) |
Publisher ID |
protocol |
TEXT |
Protocol name |
name |
VARCHAR(254) |
SenML name |
unit |
TEXT |
SenML unit |
value |
FLOAT |
Numeric value |
string_value |
TEXT |
String value |
bool_value |
BOOL |
Boolean value |
data_value |
BYTEA |
Data value |
sum |
FLOAT |
Sum value |
update_time |
FLOAT |
Update time |
Primary key: (time, channel, subtopic, protocol, publisher, name)
Timescale writer creates a hypertable on messages and adds time-series indexes for common query paths.
JSON payload tables (dynamic)
If the transformer emits JSON payloads, the writers create a table named after the payload format:
Postgres JSON table:
id UUID, created BIGINT, channel VARCHAR(254), subtopic VARCHAR(254), publisher VARCHAR(254), protocol TEXT, payload JSONB (PK: id)
Timescale JSON table:
created BIGINT, channel VARCHAR(254), subtopic VARCHAR(254), publisher VARCHAR(254), protocol TEXT, payload JSONB (PK: created, publisher, subtopic)
Deployment
Build and run locally
Postgres writer:
make postgres-writer
MG_POSTGRES_WRITER_LOG_LEVEL=debug \
MG_POSTGRES_WRITER_CONFIG_PATH=./docker/addons/postgres-writer/config.toml \
MG_POSTGRES_WRITER_HTTP_PORT=9007 \
MG_POSTGRES_HOST=localhost \
MG_POSTGRES_PORT=5432 \
MG_POSTGRES_USER=magistrala \MG_POSTGRES_PASS=magistrala \MG_POSTGRES_NAME=messages \
MG_MESSAGE_BROKER_URL=nats://localhost:4222 \
MG_JAEGER_URL=http://localhost:4318/v1/traces \
./build/postgres-writer
Timescale writer:
make timescale-writer
MG_TIMESCALE_WRITER_LOG_LEVEL=debug \
MG_TIMESCALE_WRITER_CONFIG_PATH=./docker/addons/timescale-writer/config.toml \
MG_TIMESCALE_WRITER_HTTP_PORT=9012 \
MG_TIMESCALE_HOST=localhost \
MG_TIMESCALE_PORT=5432 \
MG_TIMESCALE_USER=magistrala \MG_TIMESCALE_PASS=magistrala \MG_TIMESCALE_NAME=magistrala \MG_MESSAGE_BROKER_URL=nats://localhost:4222 \
MG_JAEGER_URL=http://localhost:4318/v1/traces \
./build/timescale-writer
Docker Compose
Postgres writer add-on:
docker compose -f docker/docker-compose.yaml -f docker/addons/postgres-writer/docker-compose.yaml up
Timescale writer:
docker compose -f docker/docker-compose.yaml up
Health check
curl -X GET http://localhost:9007/health \
-H "accept: application/health+json"
Testing
go test ./consumers/writers/...
Usage
Writers do not expose a message ingestion API. Messages are written via the message broker, and writers consume them through the stream-backed broker adapter. The HTTP API provides only health and metrics endpoints.
| Endpoint | Description |
|---|---|
GET /health |
Service health check |
GET /metrics |
Prometheus metrics |
For an in-depth explanation of Writers, see the official documentation.